Python의 동시성-프로세스 풀

프로세스 풀은 스레드 풀을 만들고 사용한 것과 동일한 방식으로 만들고 사용할 수 있습니다. 프로세스 풀은 작업을받을 준비가 된 사전 인스턴스화 및 유휴 프로세스 그룹으로 정의 할 수 있습니다. 많은 작업을 수행해야 할 때 모든 작업에 대해 새 프로세스를 인스턴스화하는 것보다 프로세스 풀을 만드는 것이 좋습니다.

Python 모듈 – Concurrent.futures

Python 표준 라이브러리에는 concurrent.futures. 이 모듈은 개발자에게 비동기 작업을 시작하기위한 고급 인터페이스를 제공하기 위해 Python 3.2에 추가되었습니다. 스레드 또는 프로세스 풀을 사용하여 작업을 실행하기위한 인터페이스를 제공하기위한 Python의 스레딩 및 다중 처리 모듈 상단에있는 추상화 계층입니다.

이후 섹션에서는 concurrent.futures 모듈의 여러 하위 클래스를 살펴볼 것입니다.

실행자 클래스

Executor 의 추상 클래스입니다 concurrent.futuresPython 모듈. 직접 사용할 수 없으며 다음 구체적인 하위 클래스 중 하나를 사용해야합니다.

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor – 구체적인 하위 클래스

Executor 클래스의 구체적인 하위 클래스 중 하나입니다. 다중 처리를 사용하고 작업 제출을위한 프로세스 풀을 얻습니다. 이 풀은 사용 가능한 프로세스에 작업을 할당하고 실행하도록 예약합니다.

ProcessPoolExecutor를 만드는 방법은 무엇입니까?

의 도움으로 concurrent.futures 모듈 및 구체적인 하위 클래스 Executor, 우리는 쉽게 프로세스 풀을 만들 수 있습니다. 이를 위해 우리는ProcessPoolExecutor풀에서 원하는 프로세스 수로 기본적으로 숫자는 5입니다. 그 다음에는 프로세스 풀에 작업을 제출합니다.

이제 스레드 풀을 만들 때 사용한 것과 동일한 예를 고려할 것입니다. 유일한 차이점은 ProcessPoolExecutor 대신에 ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

산출

False
False
Completed

위의 예에서 프로세스PoolExecutor5 개의 스레드로 구성되었습니다. 그런 다음 메시지를 제공하기 전에 2 초 동안 대기하는 작업이 프로세스 풀 실행기에 제출됩니다. 출력에서 볼 수 있듯이 작업은 2 초까지 완료되지 않으므로 첫 번째 호출은done()False를 반환합니다. 2 초 후에 작업이 완료되고 다음을 호출하여 미래의 결과를 얻습니다.result() 그것에 방법.

ProcessPoolExecutor 인스턴스화 – 컨텍스트 관리자

ProcessPoolExecutor를 인스턴스화하는 또 다른 방법은 컨텍스트 관리자를 사용하는 것입니다. 위의 예에서 사용 된 방법과 유사하게 작동합니다. 컨텍스트 관리자를 사용할 때의 주요 이점은 구문 적으로 좋아 보인다는 것입니다. 인스턴스화는 다음 코드를 사용하여 수행 할 수 있습니다.

with ProcessPoolExecutor(max_workers = 5) as executor

더 나은 이해를 위해 스레드 풀을 만들 때 사용한 것과 동일한 예제를 사용합니다. 이 예에서는 먼저concurrent.futures기준 치수. 그런 다음 이름이 지정된 함수load_url()요청 된 URL을로드 할 생성됩니다. 그만큼ProcessPoolExecutor그런 다음 풀에있는 5 개의 스레드로 생성됩니다. 과정PoolExecutor컨텍스트 관리자로 활용되었습니다. 우리는 다음을 호출하여 미래의 결과를 얻을 수 있습니다.result() 그것에 방법.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

산출

위의 Python 스크립트는 다음 출력을 생성합니다.

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Executor.map () 함수 사용

파이썬 map()기능은 많은 작업을 수행하는 데 널리 사용됩니다. 이러한 작업 중 하나는 iterables 내의 모든 요소에 특정 함수를 적용하는 것입니다. 마찬가지로 반복기의 모든 요소를 ​​함수에 매핑하고이를 독립 작업으로 제출할 수 있습니다.ProcessPoolExecutor. 이를 이해하려면 다음 Python 스크립트 예제를 고려하십시오.

다음을 사용하여 스레드 풀을 만들 때 사용한 것과 동일한 예를 고려합니다. Executor.map()함수. 아래 주어진 예에서는 map 함수를 사용하여square() 값 배열의 모든 값에 함수를 추가합니다.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

산출

위의 Python 스크립트는 다음 출력을 생성합니다.

4
9
16
25

ProcessPoolExecutor 및 ThreadPoolExecutor는 언제 사용합니까?

이제 Executor 클래스 (ThreadPoolExecutor 및 ProcessPoolExecutor) 모두에 대해 공부 했으므로 언제 어떤 실행기를 사용해야하는지 알아야합니다. CPU 바운드 워크로드의 경우 ProcessPoolExecutor를 선택하고 I / O 바운드 워크로드의 경우 ThreadPoolExecutor를 선택해야합니다.

우리가 사용한다면 ProcessPoolExecutor, 그러면 다중 처리를 사용하기 때문에 GIL에 대해 걱정할 필요가 없습니다. 또한 실행 시간은ThreadPoolExecution. 이를 이해하려면 다음 Python 스크립트 예제를 고려하십시오.

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

산출

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

산출

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

위의 두 프로그램의 출력에서 ​​우리는 사용하는 동안 실행 시간의 차이를 볼 수 있습니다. ProcessPoolExecutorThreadPoolExecutor.