Classe con più lavoratori
Sto costruendo uno strumento che interagisce con un flusso in batch di dati in arrivo. Questi dati devono essere elaborati e il risultato restituito. Per suddividere il lavoro ho creato una classe che ha code inbound ( _in
) e outbound ( out
) e worker che stanno ricevendo, elaborando e depositando il lavoro.
Questo esempio prende un iterabile di numeri (in pass_data
) e li moltiplica per f
.
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
Questa è, al momento, una prova di concetto. Ci sono delle insidie a questo metodo? C'è già un modo migliore per farlo ?!
Aspetti che intendo affrontare:
- limiti di dimensione della coda
- limiti del numero di thread
Risposte
Ecco alcune osservazioni e cose da considerare.
Sei sicuro di aver bisogno di multiprocessing o thread? Non ci sono informazioni nella domanda per dire perché potrebbero essere necessarie. C'è un sovraccarico per usarli. Forse è sufficiente un ciclo input-calcolo-output.
Prevedi che il programma abbia un throughput limitato dall'I / O o dall'elaborazione della CPU. La regola generale è usare thread o asynchio
per il primo e processi per il secondo.
È importante che i risultati non vengano restituiti nello stesso ordine in cui sono stati inviati? Devono essere contrassegnati con data e ora?
threads
è un nome di parametro che confonde quando si utilizzano processi.
Il codice principale corrente inserisce gli elementi nella coda di input e ottiene gli elementi dalla coda di output. Se le code hanno dimensioni limitate, sarà possibile eseguire il deadlock se il codice principale viene bloccato durante l'aggiunta a una coda di input completa e ai worker viene impedito di aggiungere a una coda di output completa.
Uso multiprocessing.Pool
La multiprocessing
libreria ha già un'implementazione del pool di nodi di lavoro pronta per essere utilizzata. Il tuo codice potrebbe essere riscritto come:
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
Sebbene multiprocessing.Pool
ti consenta di verificare se i risultati sono pronti utilizzando .ready()
il risultato di una chiamata apply_async()
o map_async()
, non puoi ottenere un risultato parziale da map_async()
. Tuttavia, se desideri elaborare i singoli risultati non appena sono pronti, puoi considerare di chiamare apply_async()
con una funzione di callback che gestisce il risultato.