Concurrence en Python - Pool de processus

Le pool de processus peut être créé et utilisé de la même manière que nous avons créé et utilisé le pool de threads. Le pool de processus peut être défini comme le groupe de processus pré-instanciés et inactifs, prêts à recevoir du travail. La création d'un pool de processus est préférable à l'instanciation de nouveaux processus pour chaque tâche lorsque nous devons effectuer un grand nombre de tâches.

Module Python - Concurrent.futures

La bibliothèque standard Python a un module appelé le concurrent.futures. Ce module a été ajouté dans Python 3.2 pour fournir aux développeurs une interface de haut niveau pour le lancement de tâches asynchrones. Il s'agit d'une couche d'abstraction au-dessus des modules de thread et de multiprocessus de Python pour fournir l'interface pour exécuter les tâches à l'aide d'un pool de threads ou de processus.

Dans nos sections suivantes, nous examinerons les différentes sous-classes du module concurrent.futures.

Classe d'exécuteur

Executor est une classe abstraite du concurrent.futuresModule Python. Il ne peut pas être utilisé directement et nous devons utiliser l'une des sous-classes concrètes suivantes -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - Une sous-classe concrète

C'est l'une des sous-classes concrètes de la classe Executor. Il utilise le multi-traitement et nous obtenons un pool de processus pour soumettre les tâches. Ce pool affecte des tâches aux processus disponibles et planifie leur exécution.

Comment créer un ProcessPoolExecutor?

Avec l'aide du concurrent.futures module et sa sous-classe concrète Executor, nous pouvons facilement créer un pool de processus. Pour cela, nous devons construire unProcessPoolExecutoravec le nombre de processus que nous voulons dans le pool. Par défaut, le nombre est 5. Ceci est suivi de la soumission d'une tâche au pool de processus.

Exemple

Nous allons maintenant considérer le même exemple que nous avons utilisé lors de la création du pool de threads, la seule différence étant que nous allons maintenant utiliser ProcessPoolExecutor au lieu de ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Production

False
False
Completed

Dans l'exemple ci-dessus, un processusPoolExecutora été construit avec 5 fils. Ensuite, une tâche, qui attendra 2 secondes avant de donner le message, est soumise à l'exécuteur du pool de processus. Comme le montre la sortie, la tâche ne se termine pas avant 2 secondes, donc le premier appel àdone()retournera False. Au bout de 2 secondes, la tâche est terminée et nous obtenons le résultat du futur en appelant leresult() méthode là-dessus.

Instanciation de ProcessPoolExecutor - Gestionnaire de contexte

Une autre façon d'instancier ProcessPoolExecutor consiste à utiliser le gestionnaire de contexte. Cela fonctionne de manière similaire à la méthode utilisée dans l'exemple ci-dessus. Le principal avantage de l'utilisation du gestionnaire de contexte est qu'il a une bonne syntaxe. L'instanciation peut être effectuée à l'aide du code suivant -

with ProcessPoolExecutor(max_workers = 5) as executor

Exemple

Pour une meilleure compréhension, nous prenons le même exemple que celui utilisé lors de la création d'un pool de threads. Dans cet exemple, nous devons commencer par importer leconcurrent.futuresmodule. Puis une fonction nomméeload_url()est créé qui chargera l'url demandée. leProcessPoolExecutorest alors créé avec le nombre de 5 threads dans le pool. Le processusPoolExecutora été utilisé comme gestionnaire de contexte. Nous pouvons obtenir le résultat du futur en appelant leresult() méthode là-dessus.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Production

Le script Python ci-dessus générera la sortie suivante -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Utilisation de la fonction Executor.map ()

Le Python map()La fonction est largement utilisée pour effectuer un certain nombre de tâches. L'une de ces tâches consiste à appliquer une certaine fonction à chaque élément des itérables. De même, nous pouvons mapper tous les éléments d'un itérateur à une fonction et les soumettre en tant que jobs indépendants auProcessPoolExecutor. Considérez l'exemple suivant de script Python pour comprendre cela.

Exemple

Nous considérerons le même exemple que nous avons utilisé lors de la création d'un pool de threads en utilisant le Executor.map()fonction. Dans l'exemple donné ci-dessous, la fonction map est utilisée pour appliquersquare() fonction à chaque valeur du tableau de valeurs.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Production

Le script Python ci-dessus générera la sortie suivante

4
9
16
25

Quand utiliser ProcessPoolExecutor et ThreadPoolExecutor?

Maintenant que nous avons étudié les deux classes Executor - ThreadPoolExecutor et ProcessPoolExecutor, nous devons savoir quand utiliser quel exécuteur. Nous devons choisir ProcessPoolExecutor en cas de charges de travail liées au processeur et ThreadPoolExecutor en cas de charges de travail liées aux E / S.

Si nous utilisons ProcessPoolExecutor, alors nous n'avons pas à nous soucier de GIL car il utilise le multitraitement. De plus, le temps d'exécution sera moindre par rapport àThreadPoolExecution. Considérez l'exemple de script Python suivant pour comprendre cela.

Exemple

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Production

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Production

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

À partir des sorties des deux programmes ci-dessus, nous pouvons voir la différence de temps d'exécution lors de l'utilisation ProcessPoolExecutor et ThreadPoolExecutor.