Параллелизм в Python - пул процессов
Пул процессов можно создавать и использовать так же, как мы создавали и использовали пул потоков. Пул процессов можно определить как группу предварительно созданных и бездействующих процессов, которые готовы к работе. Создание пула процессов предпочтительнее, чем создание экземпляров новых процессов для каждой задачи, когда нам нужно выполнить большое количество задач.
Модуль Python - Concurrent.futures
В стандартной библиотеке Python есть модуль, называемый concurrent.futures. Этот модуль был добавлен в Python 3.2 для предоставления разработчикам высокоуровневого интерфейса для запуска асинхронных задач. Это уровень абстракции поверх модулей потоковой обработки и многопроцессорности Python для предоставления интерфейса для выполнения задач с использованием пула потоков или процессов.
В наших последующих разделах мы рассмотрим различные подклассы модуля concurrent.futures.
Класс исполнителя
Executor это абстрактный класс concurrent.futuresМодуль Python. Его нельзя использовать напрямую, и нам нужно использовать один из следующих конкретных подклассов -
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor - конкретный подкласс
Это один из конкретных подклассов класса Executor. Он использует многопроцессорную обработку, и мы получаем пул процессов для отправки задач. Этот пул назначает задачи доступным процессам и планирует их запуск.
Как создать ProcessPoolExecutor?
С помощью concurrent.futures модуль и его конкретный подкласс Executor, мы можем легко создать пул процессов. Для этого нам нужно построитьProcessPoolExecutorс количеством процессов, которые мы хотим в пуле. По умолчанию это число 5. После этого выполняется отправка задачи в пул процессов.
пример
Теперь мы рассмотрим тот же пример, который мы использовали при создании пула потоков, с той лишь разницей, что теперь мы будем использовать ProcessPoolExecutor вместо ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
Вывод
False
False
Completed
В приведенном выше примере процессPoolExecutorбыл построен с 5 потоками. Затем задача, которая будет ждать 2 секунды перед отправкой сообщения, отправляется исполнителю пула процессов. Как видно из выходных данных, задача не выполняется до 2 секунд, поэтому первый вызовdone()вернет False. Через 2 секунды задача выполнена, и мы получаем результат будущего, вызываяresult() метод на нем.
Создание экземпляра ProcessPoolExecutor - диспетчера контекста
Другой способ создать экземпляр ProcessPoolExecutor - использовать диспетчер контекста. Он работает аналогично методу, использованному в приведенном выше примере. Основное преимущество использования диспетчера контекста в том, что он синтаксически хорошо выглядит. Создание экземпляра может быть выполнено с помощью следующего кода -
with ProcessPoolExecutor(max_workers = 5) as executor
пример
Для лучшего понимания мы берем тот же пример, который использовался при создании пула потоков. В этом примере нам нужно начать с импортаconcurrent.futuresмодуль. Затем функция с именемload_url()создается, который загрузит запрошенный URL. ВProcessPoolExecutorзатем создается 5 потоков в пуле. ПроцессPoolExecutorбыл использован как менеджер контекста. Мы можем получить результат будущего, позвонивresult() метод на нем.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
Вывод
Вышеупомянутый скрипт Python сгенерирует следующий вывод -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Использование функции Executor.map ()
Питон map()Функция широко используется для выполнения ряда задач. Одна из таких задач - применить определенную функцию к каждому элементу в итерациях. Точно так же мы можем сопоставить все элементы итератора с функцией и отправить их как независимые задания вProcessPoolExecutor. Чтобы понять это, рассмотрим следующий пример скрипта Python.
пример
Мы рассмотрим тот же пример, который мы использовали при создании пула потоков с использованием Executor.map()функция. В приведенном ниже примере функция карты используется для примененияsquare() для каждого значения в массиве значений.
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
Вывод
Приведенный выше сценарий Python сгенерирует следующий вывод
4
9
16
25
Когда использовать ProcessPoolExecutor и ThreadPoolExecutor?
Теперь, когда мы изучили оба класса Executor - ThreadPoolExecutor и ProcessPoolExecutor, нам нужно знать, когда использовать какой исполнитель. Нам нужно выбрать ProcessPoolExecutor в случае рабочих нагрузок, связанных с процессором, и ThreadPoolExecutor в случае рабочих нагрузок, связанных с вводом-выводом.
Если мы используем ProcessPoolExecutor, то нам не нужно беспокоиться о GIL, потому что он использует многопроцессорность. Более того, время выполнения будет меньше по сравнению сThreadPoolExecution. Чтобы понять это, рассмотрим следующий пример сценария Python.
пример
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Вывод
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Вывод
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
Из результатов обеих программ выше мы можем увидеть разницу во времени выполнения при использовании ProcessPoolExecutor и ThreadPoolExecutor.