การทำงานพร้อมกันใน Python - กลุ่มของกระบวนการ
กลุ่มของกระบวนการสามารถสร้างและใช้ในลักษณะเดียวกับที่เราสร้างและใช้กลุ่มของเธรด พูลกระบวนการสามารถกำหนดเป็นกลุ่มของกระบวนการที่สร้างอินสแตนซ์ล่วงหน้าและไม่ได้ใช้งานซึ่งพร้อมที่จะรับงาน การสร้างพูลกระบวนการเป็นที่ต้องการมากกว่าการสร้างอินสแตนซ์กระบวนการใหม่สำหรับทุกงานเมื่อเราจำเป็นต้องทำงานจำนวนมาก
โมดูล Python - Concurrent.futures
ไลบรารีมาตรฐาน Python มีโมดูลที่เรียกว่า concurrent.futures. โมดูลนี้ถูกเพิ่มใน Python 3.2 เพื่อให้นักพัฒนามีอินเทอร์เฟซระดับสูงสำหรับการเรียกใช้งานแบบอะซิงโครนัส เป็นเลเยอร์นามธรรมที่ด้านบนของโมดูลเธรดและการประมวลผลหลายส่วนของ Python สำหรับจัดเตรียมอินเทอร์เฟซสำหรับรันงานโดยใช้กลุ่มเธรดหรือกระบวนการ
ในส่วนต่อไปเราจะดูคลาสย่อยที่แตกต่างกันของโมดูล concurrent.futures
ระดับผู้บริหาร
Executor เป็นคลาสนามธรรมของ concurrent.futuresโมดูล Python ไม่สามารถใช้ได้โดยตรงและเราจำเป็นต้องใช้หนึ่งในคลาสย่อยคอนกรีตต่อไปนี้ -
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor - คลาสย่อยที่เป็นรูปธรรม
มันเป็นหนึ่งในคลาสย่อยที่เป็นรูปธรรมของคลาส Executor มันใช้การประมวลผลหลายขั้นตอนและเราได้รับกลุ่มของกระบวนการในการส่งงาน พูลนี้มอบหมายงานให้กับกระบวนการที่มีและกำหนดเวลาให้ทำงาน
จะสร้าง ProcessPoolExecutor ได้อย่างไร?
ด้วยความช่วยเหลือของ concurrent.futures โมดูลและคลาสย่อยคอนกรีต Executorเราสามารถสร้างกลุ่มของกระบวนการได้อย่างง่ายดาย สำหรับสิ่งนี้เราจำเป็นต้องสร้างไฟล์ProcessPoolExecutorด้วยจำนวนกระบวนการที่เราต้องการในพูล ตามค่าเริ่มต้นตัวเลขคือ 5 ตามด้วยการส่งงานไปยังพูลกระบวนการ
ตัวอย่าง
ตอนนี้เราจะพิจารณาตัวอย่างเดียวกับที่เราใช้ในขณะสร้างเธรดพูลข้อแตกต่างเพียงอย่างเดียวคือตอนนี้เราจะใช้ ProcessPoolExecutor แทน ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
เอาต์พุต
False
False
Completed
ในตัวอย่างข้างต้นกระบวนการPoolExecutorถูกสร้างขึ้นด้วย 5 เธรด จากนั้นงานซึ่งจะรอ 2 วินาทีก่อนที่จะส่งข้อความจะถูกส่งไปยังตัวดำเนินการพูลกระบวนการ ดังที่เห็นจากผลลัพธ์งานจะไม่เสร็จสมบูรณ์จนกว่าจะถึง 2 วินาทีดังนั้นการโทรครั้งแรกไปที่done()จะส่งคืน False หลังจาก 2 วินาทีงานจะเสร็จสิ้นและเราจะได้รับผลลัพธ์ของอนาคตโดยเรียกไฟล์result() วิธีการกับมัน
การสร้างอินสแตนซ์ ProcessPoolExecutor - ตัวจัดการบริบท
อีกวิธีหนึ่งในการสร้างอินสแตนซ์ ProcessPoolExecutor คือด้วยความช่วยเหลือของผู้จัดการบริบท ทำงานคล้ายกับวิธีการที่ใช้ในตัวอย่างข้างต้น ข้อได้เปรียบหลักของการใช้ตัวจัดการบริบทคือมันดูดีในเชิงไวยากรณ์ การสร้างอินสแตนซ์สามารถทำได้ด้วยความช่วยเหลือของรหัสต่อไปนี้ -
with ProcessPoolExecutor(max_workers = 5) as executor
ตัวอย่าง
เพื่อความเข้าใจที่ดีขึ้นเราใช้ตัวอย่างเดียวกับที่ใช้ในการสร้างเธรดพูล ในตัวอย่างนี้เราต้องเริ่มต้นด้วยการนำเข้าไฟล์concurrent.futuresโมดูล. จากนั้นฟังก์ชั่นที่ชื่อว่าload_url()ถูกสร้างขึ้นซึ่งจะโหลด url ที่ร้องขอ ProcessPoolExecutorจากนั้นจะสร้างด้วยจำนวน 5 เธรดในพูล กระบวนการPoolExecutorถูกใช้เป็นตัวจัดการบริบท เราสามารถรับผลลัพธ์ของอนาคตได้โดยเรียกไฟล์result() วิธีการกับมัน
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
เอาต์พุต
สคริปต์ Python ด้านบนจะสร้างผลลัพธ์ต่อไปนี้ -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
การใช้ฟังก์ชัน Executor.map ()
Python map()ฟังก์ชั่นใช้กันอย่างแพร่หลายในการทำงานหลายอย่าง ภารกิจอย่างหนึ่งคือการใช้ฟังก์ชันบางอย่างกับทุกองค์ประกอบภายในการวนซ้ำ ในทำนองเดียวกันเราสามารถแมปองค์ประกอบทั้งหมดของตัววนซ้ำกับฟังก์ชันและส่งงานเหล่านี้เป็นงานอิสระไปยังไฟล์ProcessPoolExecutor. พิจารณาตัวอย่างต่อไปนี้ของสคริปต์ Python เพื่อทำความเข้าใจสิ่งนี้
ตัวอย่าง
เราจะพิจารณาตัวอย่างเดียวกับที่เราใช้ในขณะสร้างเธรดพูลโดยใช้ Executor.map()ฟังก์ชัน ในตัวอย่างที่ระบุไว้ด้านล่างฟังก์ชันแผนที่ถูกใช้เพื่อนำไปใช้square() ฟังก์ชันกับทุกค่าในอาร์เรย์ค่า
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
เอาต์พุต
สคริปต์ Python ด้านบนจะสร้างผลลัพธ์ต่อไปนี้
4
9
16
25
ควรใช้ ProcessPoolExecutor และ ThreadPoolExecutor เมื่อใด
ตอนนี้เราได้ศึกษาเกี่ยวกับคลาส Executor ทั้ง ThreadPoolExecutor และ ProcessPoolExecutor แล้วเราจำเป็นต้องรู้ว่าเมื่อใดควรใช้ตัวดำเนินการใด เราจำเป็นต้องเลือก ProcessPoolExecutor ในกรณีของเวิร์กโหลดที่ผูกกับ CPU และ ThreadPoolExecutor ในกรณีของปริมาณงานที่ผูกกับ I / O
ถ้าเราใช้ ProcessPoolExecutorดังนั้นเราไม่จำเป็นต้องกังวลเกี่ยวกับ GIL เพราะมันใช้การประมวลผลหลายขั้นตอน ยิ่งไปกว่านั้นเวลาดำเนินการจะน้อยลงเมื่อเทียบกับThreadPoolExecution. พิจารณาตัวอย่างสคริปต์ Python ต่อไปนี้เพื่อทำความเข้าใจสิ่งนี้
ตัวอย่าง
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
เอาต์พุต
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
เอาต์พุต
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
จากผลลัพธ์ของทั้งสองโปรแกรมข้างต้นเราจะเห็นความแตกต่างของเวลาดำเนินการขณะใช้งาน ProcessPoolExecutor และ ThreadPoolExecutor.