Параллелизм в Python - пул процессов

Пул процессов можно создавать и использовать так же, как мы создавали и использовали пул потоков. Пул процессов можно определить как группу предварительно созданных и бездействующих процессов, которые готовы к работе. Создание пула процессов предпочтительнее, чем создание экземпляров новых процессов для каждой задачи, когда нам нужно выполнить большое количество задач.

Модуль Python - Concurrent.futures

В стандартной библиотеке Python есть модуль, называемый concurrent.futures. Этот модуль был добавлен в Python 3.2 для предоставления разработчикам высокоуровневого интерфейса для запуска асинхронных задач. Это уровень абстракции поверх модулей потоковой обработки и многопроцессорности Python для предоставления интерфейса для выполнения задач с использованием пула потоков или процессов.

В наших последующих разделах мы рассмотрим различные подклассы модуля concurrent.futures.

Класс исполнителя

Executor это абстрактный класс concurrent.futuresМодуль Python. Его нельзя использовать напрямую, и нам нужно использовать один из следующих конкретных подклассов -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - конкретный подкласс

Это один из конкретных подклассов класса Executor. Он использует многопроцессорную обработку, и мы получаем пул процессов для отправки задач. Этот пул назначает задачи доступным процессам и планирует их запуск.

Как создать ProcessPoolExecutor?

С помощью concurrent.futures модуль и его конкретный подкласс Executor, мы можем легко создать пул процессов. Для этого нам нужно построитьProcessPoolExecutorс количеством процессов, которые мы хотим в пуле. По умолчанию это число 5. После этого выполняется отправка задачи в пул процессов.

пример

Теперь мы рассмотрим тот же пример, который мы использовали при создании пула потоков, с той лишь разницей, что теперь мы будем использовать ProcessPoolExecutor вместо ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Вывод

False
False
Completed

В приведенном выше примере процессPoolExecutorбыл построен с 5 потоками. Затем задача, которая будет ждать 2 секунды перед отправкой сообщения, отправляется исполнителю пула процессов. Как видно из выходных данных, задача не выполняется до 2 секунд, поэтому первый вызовdone()вернет False. Через 2 секунды задача выполнена, и мы получаем результат будущего, вызываяresult() метод на нем.

Создание экземпляра ProcessPoolExecutor - диспетчера контекста

Другой способ создать экземпляр ProcessPoolExecutor - использовать диспетчер контекста. Он работает аналогично методу, использованному в приведенном выше примере. Основное преимущество использования диспетчера контекста в том, что он синтаксически хорошо выглядит. Создание экземпляра может быть выполнено с помощью следующего кода -

with ProcessPoolExecutor(max_workers = 5) as executor

пример

Для лучшего понимания мы берем тот же пример, который использовался при создании пула потоков. В этом примере нам нужно начать с импортаconcurrent.futuresмодуль. Затем функция с именемload_url()создается, который загрузит запрошенный URL. ВProcessPoolExecutorзатем создается 5 потоков в пуле. ПроцессPoolExecutorбыл использован как менеджер контекста. Мы можем получить результат будущего, позвонивresult() метод на нем.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Вывод

Вышеупомянутый скрипт Python сгенерирует следующий вывод -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Использование функции Executor.map ()

Питон map()Функция широко используется для выполнения ряда задач. Одна из таких задач - применить определенную функцию к каждому элементу в итерациях. Точно так же мы можем сопоставить все элементы итератора с функцией и отправить их как независимые задания вProcessPoolExecutor. Чтобы понять это, рассмотрим следующий пример скрипта Python.

пример

Мы рассмотрим тот же пример, который мы использовали при создании пула потоков с использованием Executor.map()функция. В приведенном ниже примере функция карты используется для примененияsquare() для каждого значения в массиве значений.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Вывод

Приведенный выше сценарий Python сгенерирует следующий вывод

4
9
16
25

Когда использовать ProcessPoolExecutor и ThreadPoolExecutor?

Теперь, когда мы изучили оба класса Executor - ThreadPoolExecutor и ProcessPoolExecutor, нам нужно знать, когда использовать какой исполнитель. Нам нужно выбрать ProcessPoolExecutor в случае рабочих нагрузок, связанных с процессором, и ThreadPoolExecutor в случае рабочих нагрузок, связанных с вводом-выводом.

Если мы используем ProcessPoolExecutor, то нам не нужно беспокоиться о GIL, потому что он использует многопроцессорность. Более того, время выполнения будет меньше по сравнению сThreadPoolExecution. Чтобы понять это, рассмотрим следующий пример сценария Python.

пример

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Вывод

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Вывод

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

Из результатов обеих программ выше мы можем увидеть разницу во времени выполнения при использовании ProcessPoolExecutor и ThreadPoolExecutor.