Parallelität in Python - Pool von Threads

Angenommen, wir müssten eine große Anzahl von Threads für unsere Multithread-Aufgaben erstellen. Dies wäre rechenintensiv, da aufgrund zu vieler Threads viele Leistungsprobleme auftreten können. Ein Hauptproblem könnte darin bestehen, dass der Durchsatz begrenzt wird. Wir können dieses Problem lösen, indem wir einen Pool von Threads erstellen. Ein Thread-Pool kann als die Gruppe von vorinstanziierten und inaktiven Threads definiert werden, die bereit sind, Arbeit zu leisten. Das Erstellen eines Thread-Pools wird dem Instanziieren neuer Threads für jede Aufgabe vorgezogen, wenn eine große Anzahl von Aufgaben ausgeführt werden muss. Ein Thread-Pool kann die gleichzeitige Ausführung einer großen Anzahl von Threads wie folgt verwalten:

  • Wenn ein Thread in einem Thread-Pool seine Ausführung abgeschlossen hat, kann dieser Thread wiederverwendet werden.

  • Wenn ein Thread beendet wird, wird ein anderer Thread erstellt, um diesen Thread zu ersetzen.

Python-Modul - Concurrent.futures

Die Python-Standardbibliothek enthält die concurrent.futuresModul. Dieses Modul wurde in Python 3.2 hinzugefügt, um den Entwicklern eine allgemeine Benutzeroberfläche zum Starten asynchroner Aufgaben bereitzustellen. Es ist eine Abstraktionsschicht über den Threading- und Multiprocessing-Modulen von Python, um die Schnittstelle zum Ausführen der Aufgaben mithilfe eines Pools von Threads oder Prozessen bereitzustellen.

In unseren folgenden Abschnitten lernen wir die verschiedenen Klassen des Moduls concurrent.futures kennen.

Executor-Klasse

Executorist eine abstrakte Klasse der concurrent.futuresPython-Modul. Es kann nicht direkt verwendet werden und wir müssen eine der folgenden konkreten Unterklassen verwenden -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor - Eine konkrete Unterklasse

Es ist eine der konkreten Unterklassen der Executor-Klasse. Die Unterklasse verwendet Multithreading und wir erhalten einen Thread-Pool zum Senden der Aufgaben. Dieser Pool weist den verfügbaren Threads Aufgaben zu und plant deren Ausführung.

Wie erstelle ich einen ThreadPoolExecutor?

Mit der Hilfe von concurrent.futures Modul und seine konkrete Unterklasse Executorkönnen wir leicht einen Pool von Threads erstellen. Dazu müssen wir a konstruierenThreadPoolExecutormit der Anzahl der Threads, die wir im Pool haben wollen. Standardmäßig ist die Nummer 5. Dann können wir eine Aufgabe an den Thread-Pool senden. Wenn wirsubmit() eine Aufgabe, wir bekommen zurück a Future. Das Future-Objekt hat eine Methode namensdone(), was zeigt, ob sich die Zukunft aufgelöst hat. Damit wurde ein Wert für das jeweilige zukünftige Objekt festgelegt. Wenn eine Aufgabe abgeschlossen ist, setzt der Thread-Pool-Executor den Wert auf das zukünftige Objekt.

Beispiel

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Ausgabe

False
True
Completed

Im obigen Beispiel a ThreadPoolExecutorwurde mit 5 Fäden konstruiert. Anschließend wird eine Aufgabe, die 2 Sekunden wartet, bevor die Nachricht gesendet wird, an den Thread-Pool-Executor gesendet. Wie aus der Ausgabe ersichtlich, wird die Aufgabe erst nach 2 Sekunden abgeschlossen, also der erste Aufruf vondone()wird False zurückgeben. Nach 2 Sekunden ist die Aufgabe erledigt und wir erhalten das Ergebnis der Zukunft, indem wir das aufrufenresult() Methode darauf.

Instanziieren von ThreadPoolExecutor - Context Manager

Ein anderer Weg, um zu instanziieren ThreadPoolExecutorist mit Hilfe des Kontextmanagers. Es funktioniert ähnlich wie im obigen Beispiel. Der Hauptvorteil der Verwendung von Kontextmanager besteht darin, dass er syntaktisch gut aussieht. Die Instanziierung kann mit Hilfe des folgenden Codes erfolgen:

with ThreadPoolExecutor(max_workers = 5) as executor

Beispiel

Das folgende Beispiel stammt aus den Python-Dokumenten. In diesem Beispiel zunächst dieconcurrent.futuresModul muss importiert werden. Dann eine Funktion namensload_url()wird erstellt, wodurch die angeforderte URL geladen wird. Die Funktion erstellt dannThreadPoolExecutormit den 5 Threads im Pool. DasThreadPoolExecutorwurde als Kontextmanager verwendet. Wir können das Ergebnis der Zukunft erhalten, indem wir die anrufenresult() Methode darauf.

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

Ausgabe

Es folgt die Ausgabe des obigen Python-Skripts -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Verwendung der Funktion Executor.map ()

Der Python map()Funktion ist in einer Reihe von Aufgaben weit verbreitet. Eine solche Aufgabe besteht darin, auf jedes Element in iterables eine bestimmte Funktion anzuwenden. Ebenso können wir alle Elemente eines Iterators einer Funktion zuordnen und diese als unabhängige Jobs an out sendenThreadPoolExecutor. Betrachten Sie das folgende Beispiel eines Python-Skripts, um zu verstehen, wie die Funktion funktioniert.

Beispiel

In diesem Beispiel unten wird die Kartenfunktion verwendet, um das anzuwenden square() Funktion für jeden Wert im Werte-Array.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

Ausgabe

Das obige Python-Skript generiert die folgende Ausgabe:

4
9
16
25