การทำงานพร้อมกันใน Python - กลุ่มของเธรด
สมมติว่าเราต้องสร้างเธรดจำนวนมากสำหรับงานมัลติเธรดของเรา มันจะแพงที่สุดในการคำนวณเนื่องจากอาจมีปัญหาด้านประสิทธิภาพหลายประการเนื่องจากมีเธรดมากเกินไป ปัญหาสำคัญอาจอยู่ที่ปริมาณงานที่ จำกัด เราสามารถแก้ปัญหานี้ได้โดยการสร้างกลุ่มของเธรด เธรดพูลอาจถูกกำหนดให้เป็นกลุ่มของเธรดที่สร้างอินสแตนซ์ล่วงหน้าและเธรดที่ไม่ได้ใช้งานซึ่งพร้อมสำหรับการทำงาน การสร้างเธรดพูลเป็นที่ต้องการมากกว่าการสร้างอินสแตนซ์เธรดใหม่สำหรับทุกงานเมื่อเราต้องการทำงานจำนวนมาก เธรดพูลสามารถจัดการการดำเนินการพร้อมกันของเธรดจำนวนมากได้ดังนี้ -
หากเธรดในเธรดพูลเสร็จสิ้นการดำเนินการเธรดนั้นสามารถนำมาใช้ซ้ำได้
หากเธรดถูกยกเลิกเธรดอื่นจะถูกสร้างขึ้นเพื่อแทนที่เธรดนั้น
โมดูล Python - Concurrent.futures
ไลบรารีมาตรฐาน Python รวมถึงไฟล์ concurrent.futuresโมดูล. โมดูลนี้ถูกเพิ่มใน Python 3.2 เพื่อให้นักพัฒนามีอินเทอร์เฟซระดับสูงสำหรับการเรียกใช้งานแบบอะซิงโครนัส เป็นเลเยอร์นามธรรมที่ด้านบนของโมดูลเธรดและการประมวลผลหลายกระบวนการของ Python สำหรับจัดเตรียมอินเทอร์เฟซสำหรับรันงานโดยใช้กลุ่มเธรดหรือกระบวนการ
ในส่วนต่อไปเราจะเรียนรู้เกี่ยวกับคลาสต่างๆของโมดูล concurrent.futures
ระดับผู้บริหาร
Executorเป็นคลาสนามธรรมของ concurrent.futuresโมดูล Python ไม่สามารถใช้ได้โดยตรงและเราจำเป็นต้องใช้หนึ่งในคลาสย่อยคอนกรีตต่อไปนี้ -
- ThreadPoolExecutor
- ProcessPoolExecutor
ThreadPoolExecutor - คลาสย่อยคอนกรีต
มันเป็นหนึ่งในคลาสย่อยที่เป็นรูปธรรมของคลาส Executor คลาสย่อยใช้มัลติเธรดและเราได้รับกลุ่มเธรดสำหรับการส่งงาน พูลนี้มอบหมายงานให้กับเธรดที่มีและกำหนดเวลาให้รัน
จะสร้าง ThreadPoolExecutor ได้อย่างไร?
ด้วยความช่วยเหลือของ concurrent.futures โมดูลและคลาสย่อยคอนกรีต Executorเราสามารถสร้างกลุ่มของเธรดได้อย่างง่ายดาย สำหรับสิ่งนี้เราจำเป็นต้องสร้างไฟล์ThreadPoolExecutorด้วยจำนวนเธรดที่เราต้องการในพูล ตามค่าเริ่มต้นจำนวนคือ 5 จากนั้นเราสามารถส่งงานไปยังเธรดพูล เมื่อเราsubmit() งานเราจะกลับมา Future. วัตถุในอนาคตมีวิธีการที่เรียกว่าdone()ซึ่งจะบอกว่าอนาคตคลี่คลายหรือไม่ ด้วยเหตุนี้จึงมีการตั้งค่าสำหรับวัตถุในอนาคตนั้น ๆ เมื่องานเสร็จสิ้นตัวดำเนินการเธรดพูลจะตั้งค่าเป็นอ็อบเจ็กต์ในอนาคต
ตัวอย่าง
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ThreadPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
เอาต์พุต
False
True
Completed
ในตัวอย่างข้างต้น a ThreadPoolExecutorถูกสร้างขึ้นด้วย 5 เธรด จากนั้นงานซึ่งจะรอ 2 วินาทีก่อนที่จะส่งข้อความจะถูกส่งไปยังตัวดำเนินการเธรดพูล ดังที่เห็นจากผลลัพธ์งานจะไม่เสร็จสมบูรณ์จนกว่าจะถึง 2 วินาทีดังนั้นการโทรครั้งแรกไปที่done()จะส่งคืน False หลังจาก 2 วินาทีงานจะเสร็จสิ้นและเราจะได้รับผลลัพธ์ของอนาคตโดยเรียกไฟล์result() วิธีการกับมัน
การสร้างอินสแตนซ์ ThreadPoolExecutor - ตัวจัดการบริบท
อีกวิธีหนึ่งในการสร้างอินสแตนซ์ ThreadPoolExecutorด้วยความช่วยเหลือของผู้จัดการบริบท ทำงานคล้ายกับวิธีการที่ใช้ในตัวอย่างข้างต้น ข้อได้เปรียบหลักของการใช้ตัวจัดการบริบทคือมันดูดีในเชิงไวยากรณ์ การสร้างอินสแตนซ์สามารถทำได้ด้วยความช่วยเหลือของรหัสต่อไปนี้ -
with ThreadPoolExecutor(max_workers = 5) as executor
ตัวอย่าง
ตัวอย่างต่อไปนี้ยืมมาจากเอกสาร Python ในตัวอย่างนี้ก่อนอื่นconcurrent.futuresต้องนำเข้าโมดูล จากนั้นฟังก์ชั่นที่ชื่อว่าload_url()ถูกสร้างขึ้นซึ่งจะโหลด url ที่ร้องขอ จากนั้นฟังก์ชันจะสร้างThreadPoolExecutorด้วย 5 เธรดในพูล ThreadPoolExecutorถูกใช้เป็นตัวจัดการบริบท เราสามารถรับผลลัพธ์ของอนาคตได้โดยเรียกไฟล์result() วิธีการกับมัน
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
เอาต์พุต
ต่อไปนี้จะเป็นผลลัพธ์ของสคริปต์ Python ด้านบน -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes
การใช้ฟังก์ชัน Executor.map ()
Python map()ฟังก์ชันนี้ใช้กันอย่างแพร่หลายในหลาย ๆ งาน ภารกิจอย่างหนึ่งคือการใช้ฟังก์ชันบางอย่างกับทุกองค์ประกอบภายในการวนซ้ำ ในทำนองเดียวกันเราสามารถแมปองค์ประกอบทั้งหมดของตัววนซ้ำกับฟังก์ชันและส่งงานเหล่านี้ให้เป็นงานอิสระThreadPoolExecutor. พิจารณาตัวอย่างต่อไปนี้ของสคริปต์ Python เพื่อทำความเข้าใจว่าฟังก์ชันทำงานอย่างไร
ตัวอย่าง
ในตัวอย่างด้านล่างนี้ฟังก์ชันแผนที่ถูกใช้เพื่อใช้ square() ฟังก์ชันกับทุกค่าในอาร์เรย์ค่า
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ThreadPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
เอาต์พุต
สคริปต์ Python ข้างต้นสร้างผลลัพธ์ต่อไปนี้ -
4
9
16
25