Clase con varios trabajadores

Aug 19 2020

Estoy construyendo una herramienta que interactúa con un flujo por lotes de datos entrantes. Estos datos deben procesarse y devolverse el resultado. Para dividir el trabajo, he creado una clase que tiene colas de entrada ( _in) y saliente ( out) y trabajadores que están recibiendo, procesando y depositando el trabajo.

Este ejemplo toma un iterable de números (in pass_data) y los multiplica por f.

import queue, random, time
from multiprocessing import Process, Queue

def _worker(_in, out, f):
    """Get work from _in and output processed data to out"""
    while True:
        try:
            work = _in.get()
        except queue.Empty:
            continue
        # simulate blocking for some time
        time.sleep(random.uniform(0.01, 0.5))
        out.put(work * f)

class C:
    def __init__(self, f, threads=2):
        self.f = f
        self.threads = threads
        self._in, self.out = Queue(), Queue()
        self.args = (self._in, self.out, self.f)
        self.workers = [
            Process(target=_worker, args=self.args) for _ in range(self.threads)
        ]

    def __repr__(self):
        return f"{self.__class__.__name__}(threads={self.threads})"

    def start(self):
        """Start all workers"""
        for worker in self.workers:
            worker.start()

    def terminate(self):
        """Terminate all workers"""
        for worker in self.workers:
            worker.terminate()

    def pass_data(self, data):
        """Pass data to the queue to be processed"""
        for rec in data:
            self._in.put(rec)

    def get_completed(self):
        """Return a list of processed data"""
        items = []
        while True:
            try:
                items.append(self.out.get_nowait())
            except queue.Empty:
                break
        return items

if __name__ == "__main__":
    c = C(f=12, threads=2)
    c.start()

    for i in range(5):
        s = 0
        n = random.randint(1, 20)
        c.pass_data(list(range(n)))
        print(f"sent: {n}")
        while s < n:
            r = c.get_completed()
            s += len(r)
            if r:
                print(len(r), end=", ")
            time.sleep(random.uniform(0.01, 0.4))
        print()
    c.terminate()

Esto es, por el momento, una prueba de concepto. ¿Hay algún inconveniente en este método? ¿Hay alguna forma mejor de hacer esto ya?

Aspectos que pretendo abordar:

  • límites de tamaño de la cola
  • límites de número de hilo

Respuestas

4 RootTwo Sep 02 2020 at 01:44

Aquí hay algunas observaciones y cosas a considerar.

¿Estás seguro de que necesitas multiprocesamiento o subprocesos? No hay ninguna información en la pregunta que indique por qué pueden ser necesarios. Hay gastos generales para usarlos. Quizás un bucle entrada-cálculo-salida sea suficiente.

¿Anticipa que el programa tendrá un rendimiento limitado por IO o por el procesamiento de la CPU? La regla general es utilizar subprocesos o asynchiopara los primeros y procesos para los últimos.

¿Importa que los resultados no se devuelvan en el mismo orden en que se enviaron? ¿Necesitan tener una marca de tiempo?

threads es un nombre de parámetro confuso cuando se utilizan procesos.

El código principal actual coloca elementos en la cola de entrada y obtiene elementos de la cola de salida. Si las colas tienen tamaños limitados, será posible interbloquear si el código principal está bloqueado al agregarlo a una cola de entrada completa y los trabajadores no pueden agregar a una cola de salida completa.

5 G.Sliepen Sep 02 2020 at 01:24

Utilizar multiprocessing.Pool

La multiprocessingbiblioteca ya tiene una implementación de grupo de trabajadores lista para ser utilizada. Su código podría reescribirse como:

import time
from multiprocessing import Pool

def f(x):
    time.sleep(random.uniform(0.01, 0.5))
    return x * 12

if __name__ == "__main__":
    c = Pool(2)

    for i in range(5):
        n = random.randint(1, 20)
        r = c.map_async(f, list(range(n)))
        print(f"sent: {n}")
        print(f"got: {len(r.get())}")

Si bien le multiprocessing.Poolpermite verificar si los resultados están listos usando .ready()en el resultado de una llamada apply_async()o map_async(), no puede obtener un resultado parcial de map_async(). Sin embargo, si desea procesar resultados individuales tan pronto como estén listos, puede considerar llamar apply_async()con una función de devolución de llamada que maneja el resultado.