การทำงานพร้อมกันใน Python - กลุ่มของกระบวนการ

กลุ่มของกระบวนการสามารถสร้างและใช้ในลักษณะเดียวกับที่เราสร้างและใช้กลุ่มของเธรด พูลกระบวนการสามารถกำหนดเป็นกลุ่มของกระบวนการที่สร้างอินสแตนซ์ล่วงหน้าและไม่ได้ใช้งานซึ่งพร้อมที่จะรับงาน การสร้างพูลกระบวนการเป็นที่ต้องการมากกว่าการสร้างอินสแตนซ์กระบวนการใหม่สำหรับทุกงานเมื่อเราจำเป็นต้องทำงานจำนวนมาก

โมดูล Python - Concurrent.futures

ไลบรารีมาตรฐาน Python มีโมดูลที่เรียกว่า concurrent.futures. โมดูลนี้ถูกเพิ่มใน Python 3.2 เพื่อให้นักพัฒนามีอินเทอร์เฟซระดับสูงสำหรับการเรียกใช้งานแบบอะซิงโครนัส เป็นเลเยอร์นามธรรมที่ด้านบนของโมดูลเธรดและการประมวลผลหลายส่วนของ Python สำหรับจัดเตรียมอินเทอร์เฟซสำหรับรันงานโดยใช้กลุ่มเธรดหรือกระบวนการ

ในส่วนต่อไปเราจะดูคลาสย่อยที่แตกต่างกันของโมดูล concurrent.futures

ระดับผู้บริหาร

Executor เป็นคลาสนามธรรมของ concurrent.futuresโมดูล Python ไม่สามารถใช้ได้โดยตรงและเราจำเป็นต้องใช้หนึ่งในคลาสย่อยคอนกรีตต่อไปนี้ -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - คลาสย่อยที่เป็นรูปธรรม

มันเป็นหนึ่งในคลาสย่อยที่เป็นรูปธรรมของคลาส Executor มันใช้การประมวลผลหลายขั้นตอนและเราได้รับกลุ่มของกระบวนการในการส่งงาน พูลนี้มอบหมายงานให้กับกระบวนการที่มีและกำหนดเวลาให้ทำงาน

จะสร้าง ProcessPoolExecutor ได้อย่างไร?

ด้วยความช่วยเหลือของ concurrent.futures โมดูลและคลาสย่อยคอนกรีต Executorเราสามารถสร้างกลุ่มของกระบวนการได้อย่างง่ายดาย สำหรับสิ่งนี้เราจำเป็นต้องสร้างไฟล์ProcessPoolExecutorด้วยจำนวนกระบวนการที่เราต้องการในพูล ตามค่าเริ่มต้นตัวเลขคือ 5 ตามด้วยการส่งงานไปยังพูลกระบวนการ

ตัวอย่าง

ตอนนี้เราจะพิจารณาตัวอย่างเดียวกับที่เราใช้ในขณะสร้างเธรดพูลข้อแตกต่างเพียงอย่างเดียวคือตอนนี้เราจะใช้ ProcessPoolExecutor แทน ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

เอาต์พุต

False
False
Completed

ในตัวอย่างข้างต้นกระบวนการPoolExecutorถูกสร้างขึ้นด้วย 5 เธรด จากนั้นงานซึ่งจะรอ 2 วินาทีก่อนที่จะส่งข้อความจะถูกส่งไปยังตัวดำเนินการพูลกระบวนการ ดังที่เห็นจากผลลัพธ์งานจะไม่เสร็จสมบูรณ์จนกว่าจะถึง 2 วินาทีดังนั้นการโทรครั้งแรกไปที่done()จะส่งคืน False หลังจาก 2 วินาทีงานจะเสร็จสิ้นและเราจะได้รับผลลัพธ์ของอนาคตโดยเรียกไฟล์result() วิธีการกับมัน

การสร้างอินสแตนซ์ ProcessPoolExecutor - ตัวจัดการบริบท

อีกวิธีหนึ่งในการสร้างอินสแตนซ์ ProcessPoolExecutor คือด้วยความช่วยเหลือของผู้จัดการบริบท ทำงานคล้ายกับวิธีการที่ใช้ในตัวอย่างข้างต้น ข้อได้เปรียบหลักของการใช้ตัวจัดการบริบทคือมันดูดีในเชิงไวยากรณ์ การสร้างอินสแตนซ์สามารถทำได้ด้วยความช่วยเหลือของรหัสต่อไปนี้ -

with ProcessPoolExecutor(max_workers = 5) as executor

ตัวอย่าง

เพื่อความเข้าใจที่ดีขึ้นเราใช้ตัวอย่างเดียวกับที่ใช้ในการสร้างเธรดพูล ในตัวอย่างนี้เราต้องเริ่มต้นด้วยการนำเข้าไฟล์concurrent.futuresโมดูล. จากนั้นฟังก์ชั่นที่ชื่อว่าload_url()ถูกสร้างขึ้นซึ่งจะโหลด url ที่ร้องขอ ProcessPoolExecutorจากนั้นจะสร้างด้วยจำนวน 5 เธรดในพูล กระบวนการPoolExecutorถูกใช้เป็นตัวจัดการบริบท เราสามารถรับผลลัพธ์ของอนาคตได้โดยเรียกไฟล์result() วิธีการกับมัน

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

เอาต์พุต

สคริปต์ Python ด้านบนจะสร้างผลลัพธ์ต่อไปนี้ -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

การใช้ฟังก์ชัน Executor.map ()

Python map()ฟังก์ชั่นใช้กันอย่างแพร่หลายในการทำงานหลายอย่าง ภารกิจอย่างหนึ่งคือการใช้ฟังก์ชันบางอย่างกับทุกองค์ประกอบภายในการวนซ้ำ ในทำนองเดียวกันเราสามารถแมปองค์ประกอบทั้งหมดของตัววนซ้ำกับฟังก์ชันและส่งงานเหล่านี้เป็นงานอิสระไปยังไฟล์ProcessPoolExecutor. พิจารณาตัวอย่างต่อไปนี้ของสคริปต์ Python เพื่อทำความเข้าใจสิ่งนี้

ตัวอย่าง

เราจะพิจารณาตัวอย่างเดียวกับที่เราใช้ในขณะสร้างเธรดพูลโดยใช้ Executor.map()ฟังก์ชัน ในตัวอย่างที่ระบุไว้ด้านล่างฟังก์ชันแผนที่ถูกใช้เพื่อนำไปใช้square() ฟังก์ชันกับทุกค่าในอาร์เรย์ค่า

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

เอาต์พุต

สคริปต์ Python ด้านบนจะสร้างผลลัพธ์ต่อไปนี้

4
9
16
25

ควรใช้ ProcessPoolExecutor และ ThreadPoolExecutor เมื่อใด

ตอนนี้เราได้ศึกษาเกี่ยวกับคลาส Executor ทั้ง ThreadPoolExecutor และ ProcessPoolExecutor แล้วเราจำเป็นต้องรู้ว่าเมื่อใดควรใช้ตัวดำเนินการใด เราจำเป็นต้องเลือก ProcessPoolExecutor ในกรณีของเวิร์กโหลดที่ผูกกับ CPU และ ThreadPoolExecutor ในกรณีของปริมาณงานที่ผูกกับ I / O

ถ้าเราใช้ ProcessPoolExecutorดังนั้นเราไม่จำเป็นต้องกังวลเกี่ยวกับ GIL เพราะมันใช้การประมวลผลหลายขั้นตอน ยิ่งไปกว่านั้นเวลาดำเนินการจะน้อยลงเมื่อเทียบกับThreadPoolExecution. พิจารณาตัวอย่างสคริปต์ Python ต่อไปนี้เพื่อทำความเข้าใจสิ่งนี้

ตัวอย่าง

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

เอาต์พุต

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

เอาต์พุต

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

จากผลลัพธ์ของทั้งสองโปรแกรมข้างต้นเราจะเห็นความแตกต่างของเวลาดำเนินการขณะใช้งาน ProcessPoolExecutor และ ThreadPoolExecutor.