Simultaneidade em Python - pool de processos
O pool de processos pode ser criado e usado da mesma maneira que criamos e usamos o pool de threads. O pool de processos pode ser definido como o grupo de processos pré-instanciados e ociosos, que estão prontos para receber trabalho. A criação de pool de processos é preferível em vez de instanciar novos processos para cada tarefa quando precisamos fazer um grande número de tarefas.
Módulo Python - Concurrent.futures
A biblioteca padrão do Python tem um módulo chamado concurrent.futures. Este módulo foi adicionado ao Python 3.2 para fornecer aos desenvolvedores uma interface de alto nível para iniciar tarefas assíncronas. É uma camada de abstração no topo dos módulos de threading e multiprocessamento do Python para fornecer a interface para executar as tarefas usando pool de thread ou processos.
Em nossas seções subsequentes, examinaremos as diferentes subclasses do módulo concurrent.futures.
Classe Executor
Executor é uma classe abstrata de concurrent.futuresMódulo Python. Não pode ser usado diretamente e precisamos usar uma das seguintes subclasses concretas -
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor - uma subclasse concreta
É uma das subclasses concretas da classe Executor. Ele usa multiprocessamento e obtemos um pool de processos para enviar as tarefas. Este pool atribui tarefas aos processos disponíveis e os programa para execução.
Como criar um ProcessPoolExecutor?
Com a ajuda do concurrent.futures módulo e sua subclasse concreta Executor, podemos criar facilmente um pool de processos. Para isso, precisamos construir umProcessPoolExecutorcom o número de processos que queremos no pool. Por padrão, o número é 5. Isso é seguido pelo envio de uma tarefa ao pool de processos.
Exemplo
Vamos agora considerar o mesmo exemplo que usamos ao criar pool de threads, a única diferença é que agora usaremos ProcessPoolExecutor ao invés de ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
Resultado
False
False
Completed
No exemplo acima, um ProcessoPoolExecutorfoi construído com 5 fios. Em seguida, uma tarefa, que aguardará 2 segundos antes de dar a mensagem, é enviada ao executor do pool de processos. Como visto na saída, a tarefa não é concluída até 2 segundos, então a primeira chamada paradone()retornará False. Após 2 segundos, a tarefa está concluída e obtemos o resultado do futuro chamando oresult() método sobre ele.
Instanciar ProcessPoolExecutor - Gerenciador de Contexto
Outra maneira de instanciar ProcessPoolExecutor é com a ajuda do gerenciador de contexto. Funciona de forma semelhante ao método usado no exemplo acima. A principal vantagem de usar o gerenciador de contexto é que ele parece sintaticamente bom. A instanciação pode ser feita com a ajuda do seguinte código -
with ProcessPoolExecutor(max_workers = 5) as executor
Exemplo
Para melhor compreensão, estamos usando o mesmo exemplo usado durante a criação do pool de threads. Neste exemplo, precisamos começar importando oconcurrent.futuresmódulo. Então, uma função chamadaload_url()é criado, o que carregará o url solicitado. oProcessPoolExecutoré então criado com o número 5 de threads no pool. O processoPoolExecutorfoi utilizado como gerenciador de contexto. Podemos obter o resultado do futuro chamando oresult() método sobre ele.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
Resultado
O script Python acima irá gerar a seguinte saída -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Uso da função Executor.map ()
O Python map()função é amplamente utilizada para realizar uma série de tarefas. Uma dessas tarefas é aplicar uma determinada função a cada elemento dentro dos iteráveis. Da mesma forma, podemos mapear todos os elementos de um iterador para uma função e enviá-los como trabalhos independentes para oProcessPoolExecutor. Considere o seguinte exemplo de script Python para entender isso.
Exemplo
Vamos considerar o mesmo exemplo que usamos ao criar pool de threads usando o Executor.map()função. No exemplo dado abaixo, a função de mapa é usada para aplicarsquare() função para cada valor na matriz de valores.
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
Resultado
O script Python acima irá gerar a seguinte saída
4
9
16
25
Quando usar ProcessPoolExecutor e ThreadPoolExecutor?
Agora que estudamos sobre as classes de Executor - ThreadPoolExecutor e ProcessPoolExecutor, precisamos saber quando usar qual executor. Precisamos escolher ProcessPoolExecutor no caso de cargas de trabalho vinculadas à CPU e ThreadPoolExecutor no caso de cargas de trabalho vinculadas a E / S.
Se usarmos ProcessPoolExecutor, então não precisamos nos preocupar com o GIL porque ele usa multiprocessamento. Além disso, o tempo de execução será menor quando comparado comThreadPoolExecution. Considere o seguinte exemplo de script Python para entender isso.
Exemplo
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Resultado
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Resultado
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
A partir das saídas de ambos os programas acima, podemos ver a diferença do tempo de execução ao usar ProcessPoolExecutor e ThreadPoolExecutor.