Classe avec plusieurs travailleurs
Je construis un outil qui interagit avec un flux groupé de données entrantes. Ces données doivent être traitées et le résultat renvoyé. Pour diviser le travail, j'ai créé une classe qui a _in
des out
files d'attente entrantes ( ) et sortantes ( ) et des travailleurs qui obtiennent, traitent et déposent le travail.
Cet exemple prend un itérable de nombres (in pass_data
) et les multiplie par f
.
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
C'est, pour le moment, une preuve de concept. Y a-t-il des écueils à cette méthode? Existe-t-il déjà une meilleure façon de le faire?!
Aspects que je compte aborder:
- limites de taille de file d'attente
- limites du nombre de threads
Réponses
Voici quelques observations et choses à considérer.
Êtes-vous sûr d'avoir besoin de multitraitement ou de threads? Il n'y a aucune information dans la question pour expliquer pourquoi ils peuvent être nécessaires. Il y a des frais généraux pour leur utilisation. Peut-être qu'une boucle entrée-calcul-sortie est suffisante.
Prévoyez-vous que le programme aura un débit limité par les E / S ou par le traitement CPU? La règle générale consiste à utiliser des threads ou asynchio
pour le premier et des processus pour le plus tard.
Est-il important que les résultats ne soient pas renvoyés dans le même ordre qu'ils ont été soumis? Doivent-ils être horodatés?
threads
est un nom de paramètre déroutant lors de l'utilisation de processus.
Le code principal actuel place les éléments dans la file d'attente d'entrée et récupère les éléments de la file d'attente de sortie. Si les files d'attente ont des tailles limitées, il sera possible de se bloquer si le code principal est bloqué lors de l'ajout à une file d'attente d'entrée complète et que les nœuds de calcul ne peuvent pas s'ajouter à une file d'attente de sortie complète.
Utilisation multiprocessing.Pool
La multiprocessing
bibliothèque a déjà une implémentation de pool de travail prête à être utilisée. Votre code pourrait être réécrit comme:
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
Alors que multiprocessing.Pool
vous permet de vérifier si les résultats sont prêts en utilisant .ready()
le résultat d'un appel apply_async()
ou map_async()
, vous ne pouvez pas obtenir un résultat partiel à partir de map_async()
. Cependant, si vous souhaitez traiter les résultats individuels dès qu'ils sont prêts, vous pouvez envisager d'appeler apply_async()
avec une fonction de rappel qui gère le résultat.