Clase con varios trabajadores
Estoy construyendo una herramienta que interactúa con un flujo por lotes de datos entrantes. Estos datos deben procesarse y devolverse el resultado. Para dividir el trabajo, he creado una clase que tiene colas de entrada ( _in
) y saliente ( out
) y trabajadores que están recibiendo, procesando y depositando el trabajo.
Este ejemplo toma un iterable de números (in pass_data
) y los multiplica por f
.
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
Esto es, por el momento, una prueba de concepto. ¿Hay algún inconveniente en este método? ¿Hay alguna forma mejor de hacer esto ya?
Aspectos que pretendo abordar:
- límites de tamaño de la cola
- límites de número de hilo
Respuestas
Aquí hay algunas observaciones y cosas a considerar.
¿Estás seguro de que necesitas multiprocesamiento o subprocesos? No hay ninguna información en la pregunta que indique por qué pueden ser necesarios. Hay gastos generales para usarlos. Quizás un bucle entrada-cálculo-salida sea suficiente.
¿Anticipa que el programa tendrá un rendimiento limitado por IO o por el procesamiento de la CPU? La regla general es utilizar subprocesos o asynchio
para los primeros y procesos para los últimos.
¿Importa que los resultados no se devuelvan en el mismo orden en que se enviaron? ¿Necesitan tener una marca de tiempo?
threads
es un nombre de parámetro confuso cuando se utilizan procesos.
El código principal actual coloca elementos en la cola de entrada y obtiene elementos de la cola de salida. Si las colas tienen tamaños limitados, será posible interbloquear si el código principal está bloqueado al agregarlo a una cola de entrada completa y los trabajadores no pueden agregar a una cola de salida completa.
Utilizar multiprocessing.Pool
La multiprocessing
biblioteca ya tiene una implementación de grupo de trabajadores lista para ser utilizada. Su código podría reescribirse como:
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
Si bien le multiprocessing.Pool
permite verificar si los resultados están listos usando .ready()
en el resultado de una llamada apply_async()
o map_async()
, no puede obtener un resultado parcial de map_async()
. Sin embargo, si desea procesar resultados individuales tan pronto como estén listos, puede considerar llamar apply_async()
con una función de devolución de llamada que maneja el resultado.