Concurrence en Python - Pool de processus
Le pool de processus peut être créé et utilisé de la même manière que nous avons créé et utilisé le pool de threads. Le pool de processus peut être défini comme le groupe de processus pré-instanciés et inactifs, prêts à recevoir du travail. La création d'un pool de processus est préférable à l'instanciation de nouveaux processus pour chaque tâche lorsque nous devons effectuer un grand nombre de tâches.
Module Python - Concurrent.futures
La bibliothèque standard Python a un module appelé le concurrent.futures. Ce module a été ajouté dans Python 3.2 pour fournir aux développeurs une interface de haut niveau pour le lancement de tâches asynchrones. Il s'agit d'une couche d'abstraction au-dessus des modules de thread et de multiprocessus de Python pour fournir l'interface pour exécuter les tâches à l'aide d'un pool de threads ou de processus.
Dans nos sections suivantes, nous examinerons les différentes sous-classes du module concurrent.futures.
Classe d'exécuteur
Executor est une classe abstraite du concurrent.futuresModule Python. Il ne peut pas être utilisé directement et nous devons utiliser l'une des sous-classes concrètes suivantes -
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor - Une sous-classe concrète
C'est l'une des sous-classes concrètes de la classe Executor. Il utilise le multi-traitement et nous obtenons un pool de processus pour soumettre les tâches. Ce pool affecte des tâches aux processus disponibles et planifie leur exécution.
Comment créer un ProcessPoolExecutor?
Avec l'aide du concurrent.futures module et sa sous-classe concrète Executor, nous pouvons facilement créer un pool de processus. Pour cela, nous devons construire unProcessPoolExecutoravec le nombre de processus que nous voulons dans le pool. Par défaut, le nombre est 5. Ceci est suivi de la soumission d'une tâche au pool de processus.
Exemple
Nous allons maintenant considérer le même exemple que nous avons utilisé lors de la création du pool de threads, la seule différence étant que nous allons maintenant utiliser ProcessPoolExecutor au lieu de ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
Production
False
False
Completed
Dans l'exemple ci-dessus, un processusPoolExecutora été construit avec 5 fils. Ensuite, une tâche, qui attendra 2 secondes avant de donner le message, est soumise à l'exécuteur du pool de processus. Comme le montre la sortie, la tâche ne se termine pas avant 2 secondes, donc le premier appel àdone()retournera False. Au bout de 2 secondes, la tâche est terminée et nous obtenons le résultat du futur en appelant leresult() méthode là-dessus.
Instanciation de ProcessPoolExecutor - Gestionnaire de contexte
Une autre façon d'instancier ProcessPoolExecutor consiste à utiliser le gestionnaire de contexte. Cela fonctionne de manière similaire à la méthode utilisée dans l'exemple ci-dessus. Le principal avantage de l'utilisation du gestionnaire de contexte est qu'il a une bonne syntaxe. L'instanciation peut être effectuée à l'aide du code suivant -
with ProcessPoolExecutor(max_workers = 5) as executor
Exemple
Pour une meilleure compréhension, nous prenons le même exemple que celui utilisé lors de la création d'un pool de threads. Dans cet exemple, nous devons commencer par importer leconcurrent.futuresmodule. Puis une fonction nomméeload_url()est créé qui chargera l'url demandée. leProcessPoolExecutorest alors créé avec le nombre de 5 threads dans le pool. Le processusPoolExecutora été utilisé comme gestionnaire de contexte. Nous pouvons obtenir le résultat du futur en appelant leresult() méthode là-dessus.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
Production
Le script Python ci-dessus générera la sortie suivante -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Utilisation de la fonction Executor.map ()
Le Python map()La fonction est largement utilisée pour effectuer un certain nombre de tâches. L'une de ces tâches consiste à appliquer une certaine fonction à chaque élément des itérables. De même, nous pouvons mapper tous les éléments d'un itérateur à une fonction et les soumettre en tant que jobs indépendants auProcessPoolExecutor. Considérez l'exemple suivant de script Python pour comprendre cela.
Exemple
Nous considérerons le même exemple que nous avons utilisé lors de la création d'un pool de threads en utilisant le Executor.map()fonction. Dans l'exemple donné ci-dessous, la fonction map est utilisée pour appliquersquare() fonction à chaque valeur du tableau de valeurs.
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
Production
Le script Python ci-dessus générera la sortie suivante
4
9
16
25
Quand utiliser ProcessPoolExecutor et ThreadPoolExecutor?
Maintenant que nous avons étudié les deux classes Executor - ThreadPoolExecutor et ProcessPoolExecutor, nous devons savoir quand utiliser quel exécuteur. Nous devons choisir ProcessPoolExecutor en cas de charges de travail liées au processeur et ThreadPoolExecutor en cas de charges de travail liées aux E / S.
Si nous utilisons ProcessPoolExecutor, alors nous n'avons pas à nous soucier de GIL car il utilise le multitraitement. De plus, le temps d'exécution sera moindre par rapport àThreadPoolExecution. Considérez l'exemple de script Python suivant pour comprendre cela.
Exemple
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Production
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Production
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
À partir des sorties des deux programmes ci-dessus, nous pouvons voir la différence de temps d'exécution lors de l'utilisation ProcessPoolExecutor et ThreadPoolExecutor.