Kelas dengan banyak pekerja

Aug 19 2020

Saya membangun alat yang berinteraksi dengan aliran data masuk yang terkumpul. Data ini perlu diproses dan hasilnya dikembalikan. Untuk memisahkan pekerjaan, saya telah membuat kelas yang memiliki antrian masuk ( _in) dan keluar ( out) dan pekerja yang mendapatkan, memproses, dan menyimpan pekerjaan.

Contoh ini mengambil iterable angka (dalam pass_data) dan mengalikannya dengan f.

import queue, random, time
from multiprocessing import Process, Queue

def _worker(_in, out, f):
    """Get work from _in and output processed data to out"""
    while True:
        try:
            work = _in.get()
        except queue.Empty:
            continue
        # simulate blocking for some time
        time.sleep(random.uniform(0.01, 0.5))
        out.put(work * f)

class C:
    def __init__(self, f, threads=2):
        self.f = f
        self.threads = threads
        self._in, self.out = Queue(), Queue()
        self.args = (self._in, self.out, self.f)
        self.workers = [
            Process(target=_worker, args=self.args) for _ in range(self.threads)
        ]

    def __repr__(self):
        return f"{self.__class__.__name__}(threads={self.threads})"

    def start(self):
        """Start all workers"""
        for worker in self.workers:
            worker.start()

    def terminate(self):
        """Terminate all workers"""
        for worker in self.workers:
            worker.terminate()

    def pass_data(self, data):
        """Pass data to the queue to be processed"""
        for rec in data:
            self._in.put(rec)

    def get_completed(self):
        """Return a list of processed data"""
        items = []
        while True:
            try:
                items.append(self.out.get_nowait())
            except queue.Empty:
                break
        return items

if __name__ == "__main__":
    c = C(f=12, threads=2)
    c.start()

    for i in range(5):
        s = 0
        n = random.randint(1, 20)
        c.pass_data(list(range(n)))
        print(f"sent: {n}")
        while s < n:
            r = c.get_completed()
            s += len(r)
            if r:
                print(len(r), end=", ")
            time.sleep(random.uniform(0.01, 0.4))
        print()
    c.terminate()

Ini, pada saat ini, merupakan bukti konsep. Apakah ada kendala untuk metode ini? Apakah sudah ada cara yang lebih baik untuk melakukan ini ?!

Aspek yang ingin saya bahas:

  • batas ukuran antrian
  • batas nomor benang

Jawaban

4 RootTwo Sep 02 2020 at 01:44

Berikut beberapa observasi dan hal yang perlu diperhatikan.

Apakah Anda yakin Anda membutuhkan multiprocessing atau threads? Tidak ada informasi dalam pertanyaan untuk mengatakan mengapa mereka mungkin dibutuhkan. Ada biaya tambahan untuk menggunakannya. Mungkin loop input-hitung-keluaran sudah cukup.

Apakah Anda mengantisipasi program memiliki throughput yang dibatasi oleh IO atau oleh pemrosesan CPU. Aturan umumnya adalah menggunakan utas atau utas asynchiodan proses untuk nanti.

Apakah penting bahwa hasil tidak dikembalikan dengan urutan yang sama seperti saat dikirimkan? Apakah mereka perlu diberi cap waktu?

threads adalah nama parameter yang membingungkan saat menggunakan proses.

Kode utama saat ini menempatkan item dalam antrian masukan dan mendapatkan item dari antrian keluaran. Jika antrian memiliki ukuran terbatas, maka akan mungkin terjadi deadlock jika kode utama diblokir saat menambah antrian input penuh dan pekerja diblokir agar tidak menambah antrian output penuh.

5 G.Sliepen Sep 02 2020 at 01:24

Menggunakan multiprocessing.Pool

The multiprocessingperpustakaan sudah memiliki pekerja kolam renang pelaksanaan siap untuk digunakan. Kode Anda dapat ditulis ulang sebagai:

import time
from multiprocessing import Pool

def f(x):
    time.sleep(random.uniform(0.01, 0.5))
    return x * 12

if __name__ == "__main__":
    c = Pool(2)

    for i in range(5):
        n = random.randint(1, 20)
        r = c.map_async(f, list(range(n)))
        print(f"sent: {n}")
        print(f"got: {len(r.get())}")

While multiprocessing.Poolmemungkinkan Anda untuk memeriksa apakah hasil sudah siap dengan menggunakan .ready()hasil apply_async()atau map_async()panggilan, Anda tidak bisa mendapatkan hasil parsial map_async(). Namun, jika Anda ingin memproses hasil individual segera setelah siap, Anda dapat mempertimbangkan untuk memanggil apply_async()dengan fungsi callback yang menangani hasilnya.