การทำงานพร้อมกันใน Python - กลุ่มของเธรด

สมมติว่าเราต้องสร้างเธรดจำนวนมากสำหรับงานมัลติเธรดของเรา มันจะแพงที่สุดในการคำนวณเนื่องจากอาจมีปัญหาด้านประสิทธิภาพหลายประการเนื่องจากมีเธรดมากเกินไป ปัญหาสำคัญอาจอยู่ที่ปริมาณงานที่ จำกัด เราสามารถแก้ปัญหานี้ได้โดยการสร้างกลุ่มของเธรด เธรดพูลอาจถูกกำหนดให้เป็นกลุ่มของเธรดที่สร้างอินสแตนซ์ล่วงหน้าและเธรดที่ไม่ได้ใช้งานซึ่งพร้อมสำหรับการทำงาน การสร้างเธรดพูลเป็นที่ต้องการมากกว่าการสร้างอินสแตนซ์เธรดใหม่สำหรับทุกงานเมื่อเราต้องการทำงานจำนวนมาก เธรดพูลสามารถจัดการการดำเนินการพร้อมกันของเธรดจำนวนมากได้ดังนี้ -

  • หากเธรดในเธรดพูลเสร็จสิ้นการดำเนินการเธรดนั้นสามารถนำมาใช้ซ้ำได้

  • หากเธรดถูกยกเลิกเธรดอื่นจะถูกสร้างขึ้นเพื่อแทนที่เธรดนั้น

โมดูล Python - Concurrent.futures

ไลบรารีมาตรฐาน Python รวมถึงไฟล์ concurrent.futuresโมดูล. โมดูลนี้ถูกเพิ่มใน Python 3.2 เพื่อให้นักพัฒนามีอินเทอร์เฟซระดับสูงสำหรับการเรียกใช้งานแบบอะซิงโครนัส เป็นเลเยอร์นามธรรมที่ด้านบนของโมดูลเธรดและการประมวลผลหลายกระบวนการของ Python สำหรับจัดเตรียมอินเทอร์เฟซสำหรับรันงานโดยใช้กลุ่มเธรดหรือกระบวนการ

ในส่วนต่อไปเราจะเรียนรู้เกี่ยวกับคลาสต่างๆของโมดูล concurrent.futures

ระดับผู้บริหาร

Executorเป็นคลาสนามธรรมของ concurrent.futuresโมดูล Python ไม่สามารถใช้ได้โดยตรงและเราจำเป็นต้องใช้หนึ่งในคลาสย่อยคอนกรีตต่อไปนี้ -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor - คลาสย่อยคอนกรีต

มันเป็นหนึ่งในคลาสย่อยที่เป็นรูปธรรมของคลาส Executor คลาสย่อยใช้มัลติเธรดและเราได้รับกลุ่มเธรดสำหรับการส่งงาน พูลนี้มอบหมายงานให้กับเธรดที่มีและกำหนดเวลาให้รัน

จะสร้าง ThreadPoolExecutor ได้อย่างไร?

ด้วยความช่วยเหลือของ concurrent.futures โมดูลและคลาสย่อยคอนกรีต Executorเราสามารถสร้างกลุ่มของเธรดได้อย่างง่ายดาย สำหรับสิ่งนี้เราจำเป็นต้องสร้างไฟล์ThreadPoolExecutorด้วยจำนวนเธรดที่เราต้องการในพูล ตามค่าเริ่มต้นจำนวนคือ 5 จากนั้นเราสามารถส่งงานไปยังเธรดพูล เมื่อเราsubmit() งานเราจะกลับมา Future. วัตถุในอนาคตมีวิธีการที่เรียกว่าdone()ซึ่งจะบอกว่าอนาคตคลี่คลายหรือไม่ ด้วยเหตุนี้จึงมีการตั้งค่าสำหรับวัตถุในอนาคตนั้น ๆ เมื่องานเสร็จสิ้นตัวดำเนินการเธรดพูลจะตั้งค่าเป็นอ็อบเจ็กต์ในอนาคต

ตัวอย่าง

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

เอาต์พุต

False
True
Completed

ในตัวอย่างข้างต้น a ThreadPoolExecutorถูกสร้างขึ้นด้วย 5 เธรด จากนั้นงานซึ่งจะรอ 2 วินาทีก่อนที่จะส่งข้อความจะถูกส่งไปยังตัวดำเนินการเธรดพูล ดังที่เห็นจากผลลัพธ์งานจะไม่เสร็จสมบูรณ์จนกว่าจะถึง 2 วินาทีดังนั้นการโทรครั้งแรกไปที่done()จะส่งคืน False หลังจาก 2 วินาทีงานจะเสร็จสิ้นและเราจะได้รับผลลัพธ์ของอนาคตโดยเรียกไฟล์result() วิธีการกับมัน

การสร้างอินสแตนซ์ ThreadPoolExecutor - ตัวจัดการบริบท

อีกวิธีหนึ่งในการสร้างอินสแตนซ์ ThreadPoolExecutorด้วยความช่วยเหลือของผู้จัดการบริบท ทำงานคล้ายกับวิธีการที่ใช้ในตัวอย่างข้างต้น ข้อได้เปรียบหลักของการใช้ตัวจัดการบริบทคือมันดูดีในเชิงไวยากรณ์ การสร้างอินสแตนซ์สามารถทำได้ด้วยความช่วยเหลือของรหัสต่อไปนี้ -

with ThreadPoolExecutor(max_workers = 5) as executor

ตัวอย่าง

ตัวอย่างต่อไปนี้ยืมมาจากเอกสาร Python ในตัวอย่างนี้ก่อนอื่นconcurrent.futuresต้องนำเข้าโมดูล จากนั้นฟังก์ชั่นที่ชื่อว่าload_url()ถูกสร้างขึ้นซึ่งจะโหลด url ที่ร้องขอ จากนั้นฟังก์ชันจะสร้างThreadPoolExecutorด้วย 5 เธรดในพูล ThreadPoolExecutorถูกใช้เป็นตัวจัดการบริบท เราสามารถรับผลลัพธ์ของอนาคตได้โดยเรียกไฟล์result() วิธีการกับมัน

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

เอาต์พุต

ต่อไปนี้จะเป็นผลลัพธ์ของสคริปต์ Python ด้านบน -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

การใช้ฟังก์ชัน Executor.map ()

Python map()ฟังก์ชันนี้ใช้กันอย่างแพร่หลายในหลาย ๆ งาน ภารกิจอย่างหนึ่งคือการใช้ฟังก์ชันบางอย่างกับทุกองค์ประกอบภายในการวนซ้ำ ในทำนองเดียวกันเราสามารถแมปองค์ประกอบทั้งหมดของตัววนซ้ำกับฟังก์ชันและส่งงานเหล่านี้ให้เป็นงานอิสระThreadPoolExecutor. พิจารณาตัวอย่างต่อไปนี้ของสคริปต์ Python เพื่อทำความเข้าใจว่าฟังก์ชันทำงานอย่างไร

ตัวอย่าง

ในตัวอย่างด้านล่างนี้ฟังก์ชันแผนที่ถูกใช้เพื่อใช้ square() ฟังก์ชันกับทุกค่าในอาร์เรย์ค่า

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

เอาต์พุต

สคริปต์ Python ข้างต้นสร้างผลลัพธ์ต่อไปนี้ -

4
9
16
25