Parallelität in Python - Pool von Prozessen

Der Prozesspool kann auf dieselbe Weise erstellt und verwendet werden, wie wir den Thread-Pool erstellt und verwendet haben. Der Prozesspool kann als die Gruppe von vorinstanziierten und inaktiven Prozessen definiert werden, die bereit sind, Arbeit zu leisten. Das Erstellen eines Prozesspools wird dem Instanziieren neuer Prozesse für jede Aufgabe vorgezogen, wenn eine große Anzahl von Aufgaben ausgeführt werden muss.

Python-Modul - Concurrent.futures

Die Python-Standardbibliothek verfügt über ein Modul namens concurrent.futures. Dieses Modul wurde in Python 3.2 hinzugefügt, um den Entwicklern eine allgemeine Benutzeroberfläche zum Starten asynchroner Aufgaben bereitzustellen. Es ist eine Abstraktionsschicht über den Threading- und Multiprocessing-Modulen von Python, um die Schnittstelle zum Ausführen der Aufgaben mithilfe eines Pools von Threads oder Prozessen bereitzustellen.

In unseren folgenden Abschnitten werden wir uns die verschiedenen Unterklassen des Moduls concurrent.futures ansehen.

Executor-Klasse

Executor ist eine abstrakte Klasse der concurrent.futuresPython-Modul. Es kann nicht direkt verwendet werden und wir müssen eine der folgenden konkreten Unterklassen verwenden -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - Eine konkrete Unterklasse

Es ist eine der konkreten Unterklassen der Executor-Klasse. Es verwendet Multi-Processing und wir erhalten einen Pool von Prozessen zum Einreichen der Aufgaben. Dieser Pool weist den verfügbaren Prozessen Aufgaben zu und plant deren Ausführung.

Wie erstelle ich einen ProcessPoolExecutor?

Mit Hilfe der concurrent.futures Modul und seine konkrete Unterklasse Executorkönnen wir leicht einen Prozesspool erstellen. Dazu müssen wir a konstruierenProcessPoolExecutormit der Anzahl der Prozesse, die wir im Pool haben wollen. Standardmäßig ist die Nummer 5. Anschließend wird eine Aufgabe an den Prozesspool gesendet.

Beispiel

Wir werden nun dasselbe Beispiel betrachten, das wir beim Erstellen des Thread-Pools verwendet haben. Der einzige Unterschied besteht darin, dass wir es jetzt verwenden werden ProcessPoolExecutor Anstatt von ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Ausgabe

False
False
Completed

Im obigen Beispiel ein ProzessPoolExecutorwurde mit 5 Fäden konstruiert. Anschließend wird eine Aufgabe, die 2 Sekunden wartet, bevor die Nachricht gesendet wird, an den Prozesspool-Executor gesendet. Wie aus der Ausgabe ersichtlich, wird die Aufgabe erst nach 2 Sekunden abgeschlossen, also der erste Aufruf vondone()wird False zurückgeben. Nach 2 Sekunden ist die Aufgabe erledigt und wir erhalten das Ergebnis der Zukunft, indem wir das aufrufenresult() Methode darauf.

Instanziieren von ProcessPoolExecutor - Kontextmanager

Eine andere Möglichkeit, ProcessPoolExecutor zu instanziieren, ist die Verwendung des Kontextmanagers. Es funktioniert ähnlich wie im obigen Beispiel. Der Hauptvorteil der Verwendung von Kontextmanager besteht darin, dass er syntaktisch gut aussieht. Die Instanziierung kann mit Hilfe des folgenden Codes erfolgen:

with ProcessPoolExecutor(max_workers = 5) as executor

Beispiel

Zum besseren Verständnis verwenden wir dasselbe Beispiel wie beim Erstellen des Thread-Pools. In diesem Beispiel müssen wir zunächst das importierenconcurrent.futuresModul. Dann eine Funktion namensload_url()wird erstellt, wodurch die angeforderte URL geladen wird. DasProcessPoolExecutorwird dann mit der 5 Anzahl von Threads im Pool erstellt. Der ProzessPoolExecutorwurde als Kontextmanager verwendet. Wir können das Ergebnis der Zukunft erhalten, indem wir die anrufenresult() Methode darauf.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Ausgabe

Das obige Python-Skript generiert die folgende Ausgabe:

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Verwendung der Funktion Executor.map ()

Der Python map()Funktion wird häufig verwendet, um eine Reihe von Aufgaben auszuführen. Eine solche Aufgabe besteht darin, auf jedes Element in iterables eine bestimmte Funktion anzuwenden. Ebenso können wir alle Elemente eines Iterators einer Funktion zuordnen und diese als unabhängige Jobs an die sendenProcessPoolExecutor. Betrachten Sie das folgende Beispiel eines Python-Skripts, um dies zu verstehen.

Beispiel

Wir werden das gleiche Beispiel betrachten, das wir beim Erstellen des Thread-Pools mit dem verwendet haben Executor.map()Funktion. In dem unten angegebenen Beispiel wird die Kartenfunktion zum Anwenden verwendetsquare() Funktion für jeden Wert im Werte-Array.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Ausgabe

Das obige Python-Skript generiert die folgende Ausgabe

4
9
16
25

Wann sollten ProcessPoolExecutor und ThreadPoolExecutor verwendet werden?

Nachdem wir uns mit den beiden Executor-Klassen ThreadPoolExecutor und ProcessPoolExecutor befasst haben, müssen wir wissen, wann welcher Executor verwendet werden soll. Bei CPU-gebundenen Workloads müssen wir ProcessPoolExecutor und bei E / A-gebundenen Workloads ThreadPoolExecutor auswählen.

Wenn wir verwenden ProcessPoolExecutorDann brauchen wir uns keine Sorgen um GIL zu machen, da es Multiprocessing verwendet. Darüber hinaus ist die Ausführungszeit im Vergleich zu kürzerThreadPoolExecution. Betrachten Sie das folgende Python-Skriptbeispiel, um dies zu verstehen.

Beispiel

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Ausgabe

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Ausgabe

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

An den Ausgaben der beiden oben genannten Programme können wir den Unterschied der Ausführungszeit während der Verwendung erkennen ProcessPoolExecutor und ThreadPoolExecutor.