Verwenden Sie diese Methoden, um die Leistung Ihrer gleichzeitigen Python-Aufgaben zu verbessern

Apr 19 2023
Best Practices für asyncio.gather, asyncio.as_completed und asyncio.wait
Wo das Problem liegt Es war schon immer so, dass die Multithread-Leistung von Python aufgrund von GIL nie die Erwartungen erfüllt hat. Also seit Version 3.
Foto von Aleksandr Popov auf Unsplash

Wo das Problem liegt

Es war schon immer so, dass die Multithread-Leistung von Python aufgrund von GIL nie die Erwartungen erfüllt hat .

Daher hat Python seit Version 3.4 das asyncio-Paket eingeführt, um IO-gebundene Aufgaben durch Parallelität gleichzeitig auszuführen. Nach mehreren Iterationen haben die Asyncio-APIs sehr gut funktioniert, und die Leistung gleichzeitiger Aufgaben hat sich im Vergleich zur Multithread-Version dramatisch verbessert.

Es gibt jedoch immer noch viele Fehler, die Programmierer bei der Verwendung von asyncio machen:

Ein Fehler, wie in der folgenden Abbildung gezeigt, besteht darin, die await-Coroutinenmethode direkt so zu verwenden, dass der Aufruf einer gleichzeitigen Aufgabe von asynchron in synchron geändert wird, wodurch letztendlich die Parallelitätsfunktion verloren geht.

Ein weiterer Fehler wird in der folgenden Abbildung gezeigt, obwohl der Programmierer erkennt, dass er verwenden muss, create_taskum eine Aufgabe zu erstellen, die im Hintergrund ausgeführt werden soll. Die folgende Art, nacheinander auf Aufgaben zu warten, verwandelt jedoch die Aufgaben mit unterschiedlichen Zeitvorgaben in ein geordnetes Warten.

Dieser Code wartet darauf, dass task_1 zuerst beendet wird, unabhängig davon, ob task_2 zuerst beendet wird.

Was ist gleichzeitige Aufgabenausführung?

Was ist also eine echte nebenläufige Aufgabe? Lassen Sie uns ein Diagramm zur Veranschaulichung verwenden:

Egal wie viele Aufgaben wir hervorbringen, wir werden uns irgendwann wieder anschließen müssen. Bild vom Autor

Wie das Diagramm zeigt, sollte ein nebenläufiger Prozess aus zwei Teilen bestehen: Starten der Hintergrundaufgabe, erneutes Verbinden der Hintergrundaufgabe mit der Hauptfunktion und Abrufen des Ergebnisses.

Die meisten Leser werden bereits wissen, wie man create_taskeine Hintergrundaufgabe startet. Heute werde ich einige Möglichkeiten vorstellen, wie Sie auf den Abschluss einer Hintergrundaufgabe warten können, sowie die Best Practices für jede.

Einstieg

Bevor wir mit der Einführung der heutigen Hauptfigur beginnen, müssen wir eine asynchrone Beispielmethode vorbereiten, um einen IO-gebundenen Methodenaufruf zu simulieren, sowie eine benutzerdefinierte AsyncException, die verwendet werden kann, um freundlicherweise eine Ausnahmemeldung auszulösen, wenn der Test eine Ausnahme auslöst:

Vergleich von Methoden zur gleichzeitigen Ausführung

Nachdem wir die Vorbereitungen getroffen haben, ist es an der Zeit, die Tagesreise anzutreten und sich anzuschnallen.

1. asyncio.sammeln

asyncio.gatherkann verwendet werden, um eine Reihe von Hintergrundaufgaben zu starten, auf ihre Ausführung zu warten und eine Ergebnisliste zu erhalten:

asyncio.gather, obwohl es eine Gruppe von Hintergrundaufgaben bildet, kann eine Liste oder Sammlung nicht direkt als Argument akzeptieren. Wenn Sie eine Liste mit Hintergrundaufgaben übergeben müssen, entpacken Sie diese bitte.

asyncio.gathernimmt ein return_exceptionsArgument. Wenn der Wert von return_exceptionFalse ist und eine Hintergrundaufgabe eine Ausnahme auslöst, wird diese an den Aufrufer der Gather-Methode ausgegeben. Und die Ergebnisliste der Gather-Methode ist leer.

Bildschirmfoto. Bild vom Autor

Wenn der Wert von return_exceptionTrue ist, wirken sich Ausnahmen, die von Hintergrundaufgaben ausgelöst werden, nicht auf die Ausführung anderer Aufgaben aus und werden schließlich in der Ergebnisliste zusammengeführt und zusammen zurückgegeben.

results = await asyncio.gather(*aws, return_exceptions=True)

      
                
Screenshot. Image by Author

gatherDer Timeout-Parameter kann jedoch nicht direkt eingestellt werden. Wenn Sie für alle laufenden Aufgaben eine Zeitüberschreitung festlegen müssen, verwenden Sie diese Pose, die nicht elegant genug ist.

2. asyncio.as_completed

Manchmal müssen wir die folgende Aktion sofort nach Abschluss einer Hintergrundaufgabe starten. Wenn wir beispielsweise einige Daten crawlen und das maschinelle Lernmodell sofort zur Berechnung aufrufen, kann die gatherMethode unsere Anforderungen nicht erfüllen, aber wir können die as_completedMethode verwenden.

Bevor wir die Methode verwenden asyncio.as_completed, sehen wir uns den Quellcode dieser Methode an.

# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
  # ...
  for f in todo:
      f.add_done_callback(_on_completion)
  if todo and timeout is not None:
      timeout_handle = loop.call_later(timeout, _on_timeout)
  for _ in range(len(todo)):
      yield _wait_for_one()

as_completedakzeptiert das timeoutArgument, und die aktuell iterierte Aufgabe nach dem Timeout löst aus asyncio.TimeoutError:

Bildschirmfoto. Bild vom Autor

as_completedist viel flexibler als gatherbeim Umgang mit den Ergebnissen der Aufgabenausführung, aber es ist schwierig, während des Wartens neue Aufgaben zur ursprünglichen Aufgabenliste hinzuzufügen.

3. asyncio.warten

asyncio.waitwird genauso aufgerufen wie as_completed, gibt aber ein Tupel mit zwei Sätzen zurück: doneund pending. donehält die Aufgaben, die fertig ausgeführt wurden, und pendinghält die noch laufenden Aufgaben.

asyncio.waitakzeptiert einen return_whenParameter, der drei aufgezählte Werte annehmen kann:

  • Wenn return_whenist asyncio.ALL_COMPLETED, donespeichert alle abgeschlossenen Aufgaben und pendingist leer.
  • Wenn return_whenist asyncio.FIRST_COMPLETED, doneenthält alle abgeschlossenen Aufgaben und pendingenthält die noch laufenden Aufgaben.
  • Bildschirmfoto. Bild vom Autor
  • Wenn return_whenist asyncio.FIRST_EXCEPTION, donespeichert die Aufgaben, die Ausnahmen ausgelöst und die Ausführung abgeschlossen haben, und pendingenthält die noch laufenden Aufgaben.

Bildschirmfoto. Bild vom Autor

4. asyncio.Aufgabengruppe

In Python 3.11 hat asyncio die neue TaskGroupAPI eingeführt, die es Python offiziell ermöglicht, Structured Concurrency zu unterstützen . Mit dieser Funktion können Sie den Lebenszyklus gleichzeitiger Aufgaben auf pythonischere Weise verwalten. Aus Platzgründen werde ich hier nicht zu sehr ins Detail gehen, aber interessierte Leser können auf meinen Artikel verweisen:

Abschluss

Dieser Artikel stellte die asyncio.gather, asyncio.as_completedund asyncio.waitAPIs vor und überprüfte auch die neue asyncio.TaskGroupFunktion, die in Python 3.11 eingeführt wurde.

Die Verwendung dieser Methoden zur Verwaltung von Hintergrundaufgaben gemäß den tatsächlichen Anforderungen kann unsere asynchrone gleichzeitige Programmierung flexibler machen.

Aufgrund der Erfahrung gibt es zwangsläufig Auslassungen in der Darstellung dieses Artikels, also zögern Sie nicht, während des Lesevorgangs Kommentare zu hinterlassen, und ich werde aktiv antworten.