ชั้นเรียนที่มีพนักงานหลายคน

Aug 19 2020

ฉันกำลังสร้างเครื่องมือที่โต้ตอบกับสตรีมข้อมูลขาเข้าเป็นกลุ่ม ข้อมูลนี้ต้องได้รับการประมวลผลและผลลัพธ์ที่ได้กลับมา ในการแยกงานฉันได้สร้างคลาสที่มีคิวขาเข้า ( _in) และขาออก ( out) และคนงานที่กำลังรับประมวลผลและฝากงาน

ตัวอย่างนี้จะใช้เวลา iterable ของตัวเลข (ในpass_data) fและคูณพวกเขาโดย

import queue, random, time
from multiprocessing import Process, Queue

def _worker(_in, out, f):
    """Get work from _in and output processed data to out"""
    while True:
        try:
            work = _in.get()
        except queue.Empty:
            continue
        # simulate blocking for some time
        time.sleep(random.uniform(0.01, 0.5))
        out.put(work * f)

class C:
    def __init__(self, f, threads=2):
        self.f = f
        self.threads = threads
        self._in, self.out = Queue(), Queue()
        self.args = (self._in, self.out, self.f)
        self.workers = [
            Process(target=_worker, args=self.args) for _ in range(self.threads)
        ]

    def __repr__(self):
        return f"{self.__class__.__name__}(threads={self.threads})"

    def start(self):
        """Start all workers"""
        for worker in self.workers:
            worker.start()

    def terminate(self):
        """Terminate all workers"""
        for worker in self.workers:
            worker.terminate()

    def pass_data(self, data):
        """Pass data to the queue to be processed"""
        for rec in data:
            self._in.put(rec)

    def get_completed(self):
        """Return a list of processed data"""
        items = []
        while True:
            try:
                items.append(self.out.get_nowait())
            except queue.Empty:
                break
        return items

if __name__ == "__main__":
    c = C(f=12, threads=2)
    c.start()

    for i in range(5):
        s = 0
        n = random.randint(1, 20)
        c.pass_data(list(range(n)))
        print(f"sent: {n}")
        while s < n:
            r = c.get_completed()
            s += len(r)
            if r:
                print(len(r), end=", ")
            time.sleep(random.uniform(0.01, 0.4))
        print()
    c.terminate()

นี่คือข้อพิสูจน์แนวคิดในขณะนี้ วิธีนี้มีข้อผิดพลาดหรือไม่? มีวิธีที่ดีกว่านี้หรือไม่!

ด้านที่ฉันตั้งใจจะกล่าวถึง:

  • ขีด จำกัด ขนาดคิว
  • ขีด จำกัด จำนวนเธรด

คำตอบ

4 RootTwo Sep 02 2020 at 01:44

ข้อสังเกตและสิ่งที่ต้องพิจารณามีดังนี้

คุณแน่ใจว่าต้องมีการประมวลผลหลายขั้นตอนหรือเธรด? ไม่มีข้อมูลใด ๆ ในคำถามที่จะบอกว่าเหตุใดจึงอาจจำเป็นต้องใช้ มีค่าใช้จ่ายสำหรับการใช้งาน บางทีการวนรอบอินพุต - คำนวณ - เอาต์พุตก็เพียงพอแล้ว

คุณคาดว่าโปรแกรมจะมีปริมาณงาน จำกัด โดย IO หรือโดยการประมวลผลของ CPU หลักการทั่วไปคือการใช้เธรดหรือasynchioสำหรับอดีตและประมวลผลในภายหลัง

เป็นเรื่องสำคัญหรือไม่ที่ผลลัพธ์อาจไม่ถูกส่งกลับตามลำดับเดียวกันกับที่ส่งมา? พวกเขาต้องประทับเวลาหรือไม่?

threads เป็นชื่อพารามิเตอร์ที่สับสนเมื่อใช้กระบวนการ

รหัสหลักปัจจุบันทำให้รายการในคิวอินพุตและรับรายการจากคิวเอาต์พุต หากคิวมีขนาด จำกัด จะเป็นไปได้ที่จะหยุดชะงักหากโค้ดหลักถูกบล็อกไม่ให้เพิ่มในคิวอินพุตแบบเต็มและคนงานถูกบล็อกไม่ให้เพิ่มไปยังคิวเอาต์พุตแบบเต็ม

5 G.Sliepen Sep 02 2020 at 01:24

ใช้ multiprocessing.Pool

multiprocessingห้องสมุดแล้วมีสระว่ายน้ำของผู้ปฏิบัติงานการดำเนินการพร้อมที่จะใช้ รหัสของคุณสามารถเขียนใหม่เป็น:

import time
from multiprocessing import Pool

def f(x):
    time.sleep(random.uniform(0.01, 0.5))
    return x * 12

if __name__ == "__main__":
    c = Pool(2)

    for i in range(5):
        n = random.randint(1, 20)
        r = c.map_async(f, list(range(n)))
        print(f"sent: {n}")
        print(f"got: {len(r.get())}")

แม้ว่าmultiprocessing.Poolจะช่วยให้คุณตรวจสอบว่าผลลัพธ์พร้อมหรือไม่โดยใช้.ready()กับผลลัพธ์ของการโทรapply_async()หรือการmap_async()โทร แต่คุณไม่สามารถรับผลลัพธ์บางส่วนmap_async()ได้ อย่างไรก็ตามหากคุณต้องการประมวลผลผลลัพธ์แต่ละรายการทันทีที่พร้อมคุณสามารถพิจารณาโทรapply_async()ด้วยฟังก์ชันโทรกลับที่จัดการผลลัพธ์ได้