Classe avec plusieurs travailleurs

Aug 19 2020

Je construis un outil qui interagit avec un flux groupé de données entrantes. Ces données doivent être traitées et le résultat renvoyé. Pour diviser le travail, j'ai créé une classe qui a _indes outfiles d'attente entrantes ( ) et sortantes ( ) et des travailleurs qui obtiennent, traitent et déposent le travail.

Cet exemple prend un itérable de nombres (in pass_data) et les multiplie par f.

import queue, random, time
from multiprocessing import Process, Queue

def _worker(_in, out, f):
    """Get work from _in and output processed data to out"""
    while True:
        try:
            work = _in.get()
        except queue.Empty:
            continue
        # simulate blocking for some time
        time.sleep(random.uniform(0.01, 0.5))
        out.put(work * f)

class C:
    def __init__(self, f, threads=2):
        self.f = f
        self.threads = threads
        self._in, self.out = Queue(), Queue()
        self.args = (self._in, self.out, self.f)
        self.workers = [
            Process(target=_worker, args=self.args) for _ in range(self.threads)
        ]

    def __repr__(self):
        return f"{self.__class__.__name__}(threads={self.threads})"

    def start(self):
        """Start all workers"""
        for worker in self.workers:
            worker.start()

    def terminate(self):
        """Terminate all workers"""
        for worker in self.workers:
            worker.terminate()

    def pass_data(self, data):
        """Pass data to the queue to be processed"""
        for rec in data:
            self._in.put(rec)

    def get_completed(self):
        """Return a list of processed data"""
        items = []
        while True:
            try:
                items.append(self.out.get_nowait())
            except queue.Empty:
                break
        return items

if __name__ == "__main__":
    c = C(f=12, threads=2)
    c.start()

    for i in range(5):
        s = 0
        n = random.randint(1, 20)
        c.pass_data(list(range(n)))
        print(f"sent: {n}")
        while s < n:
            r = c.get_completed()
            s += len(r)
            if r:
                print(len(r), end=", ")
            time.sleep(random.uniform(0.01, 0.4))
        print()
    c.terminate()

C'est, pour le moment, une preuve de concept. Y a-t-il des écueils à cette méthode? Existe-t-il déjà une meilleure façon de le faire?!

Aspects que je compte aborder:

  • limites de taille de file d'attente
  • limites du nombre de threads

Réponses

4 RootTwo Sep 02 2020 at 01:44

Voici quelques observations et choses à considérer.

Êtes-vous sûr d'avoir besoin de multitraitement ou de threads? Il n'y a aucune information dans la question pour expliquer pourquoi ils peuvent être nécessaires. Il y a des frais généraux pour leur utilisation. Peut-être qu'une boucle entrée-calcul-sortie est suffisante.

Prévoyez-vous que le programme aura un débit limité par les E / S ou par le traitement CPU? La règle générale consiste à utiliser des threads ou asynchiopour le premier et des processus pour le plus tard.

Est-il important que les résultats ne soient pas renvoyés dans le même ordre qu'ils ont été soumis? Doivent-ils être horodatés?

threads est un nom de paramètre déroutant lors de l'utilisation de processus.

Le code principal actuel place les éléments dans la file d'attente d'entrée et récupère les éléments de la file d'attente de sortie. Si les files d'attente ont des tailles limitées, il sera possible de se bloquer si le code principal est bloqué lors de l'ajout à une file d'attente d'entrée complète et que les nœuds de calcul ne peuvent pas s'ajouter à une file d'attente de sortie complète.

5 G.Sliepen Sep 02 2020 at 01:24

Utilisation multiprocessing.Pool

La multiprocessingbibliothèque a déjà une implémentation de pool de travail prête à être utilisée. Votre code pourrait être réécrit comme:

import time
from multiprocessing import Pool

def f(x):
    time.sleep(random.uniform(0.01, 0.5))
    return x * 12

if __name__ == "__main__":
    c = Pool(2)

    for i in range(5):
        n = random.randint(1, 20)
        r = c.map_async(f, list(range(n)))
        print(f"sent: {n}")
        print(f"got: {len(r.get())}")

Alors que multiprocessing.Poolvous permet de vérifier si les résultats sont prêts en utilisant .ready()le résultat d'un appel apply_async()ou map_async(), vous ne pouvez pas obtenir un résultat partiel à partir de map_async(). Cependant, si vous souhaitez traiter les résultats individuels dès qu'ils sont prêts, vous pouvez envisager d'appeler apply_async()avec une fonction de rappel qui gère le résultat.