ชั้นเรียนที่มีพนักงานหลายคน
ฉันกำลังสร้างเครื่องมือที่โต้ตอบกับสตรีมข้อมูลขาเข้าเป็นกลุ่ม ข้อมูลนี้ต้องได้รับการประมวลผลและผลลัพธ์ที่ได้กลับมา ในการแยกงานฉันได้สร้างคลาสที่มีคิวขาเข้า ( _in
) และขาออก ( out
) และคนงานที่กำลังรับประมวลผลและฝากงาน
ตัวอย่างนี้จะใช้เวลา iterable ของตัวเลข (ในpass_data
) f
และคูณพวกเขาโดย
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
นี่คือข้อพิสูจน์แนวคิดในขณะนี้ วิธีนี้มีข้อผิดพลาดหรือไม่? มีวิธีที่ดีกว่านี้หรือไม่!
ด้านที่ฉันตั้งใจจะกล่าวถึง:
- ขีด จำกัด ขนาดคิว
- ขีด จำกัด จำนวนเธรด
คำตอบ
ข้อสังเกตและสิ่งที่ต้องพิจารณามีดังนี้
คุณแน่ใจว่าต้องมีการประมวลผลหลายขั้นตอนหรือเธรด? ไม่มีข้อมูลใด ๆ ในคำถามที่จะบอกว่าเหตุใดจึงอาจจำเป็นต้องใช้ มีค่าใช้จ่ายสำหรับการใช้งาน บางทีการวนรอบอินพุต - คำนวณ - เอาต์พุตก็เพียงพอแล้ว
คุณคาดว่าโปรแกรมจะมีปริมาณงาน จำกัด โดย IO หรือโดยการประมวลผลของ CPU หลักการทั่วไปคือการใช้เธรดหรือasynchio
สำหรับอดีตและประมวลผลในภายหลัง
เป็นเรื่องสำคัญหรือไม่ที่ผลลัพธ์อาจไม่ถูกส่งกลับตามลำดับเดียวกันกับที่ส่งมา? พวกเขาต้องประทับเวลาหรือไม่?
threads
เป็นชื่อพารามิเตอร์ที่สับสนเมื่อใช้กระบวนการ
รหัสหลักปัจจุบันทำให้รายการในคิวอินพุตและรับรายการจากคิวเอาต์พุต หากคิวมีขนาด จำกัด จะเป็นไปได้ที่จะหยุดชะงักหากโค้ดหลักถูกบล็อกไม่ให้เพิ่มในคิวอินพุตแบบเต็มและคนงานถูกบล็อกไม่ให้เพิ่มไปยังคิวเอาต์พุตแบบเต็ม
ใช้ multiprocessing.Pool
multiprocessing
ห้องสมุดแล้วมีสระว่ายน้ำของผู้ปฏิบัติงานการดำเนินการพร้อมที่จะใช้ รหัสของคุณสามารถเขียนใหม่เป็น:
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
แม้ว่าmultiprocessing.Pool
จะช่วยให้คุณตรวจสอบว่าผลลัพธ์พร้อมหรือไม่โดยใช้.ready()
กับผลลัพธ์ของการโทรapply_async()
หรือการmap_async()
โทร แต่คุณไม่สามารถรับผลลัพธ์บางส่วนmap_async()
ได้ อย่างไรก็ตามหากคุณต้องการประมวลผลผลลัพธ์แต่ละรายการทันทีที่พร้อมคุณสามารถพิจารณาโทรapply_async()
ด้วยฟังก์ชันโทรกลับที่จัดการผลลัพธ์ได้