Python의 동시성-프로세스 풀
프로세스 풀은 스레드 풀을 만들고 사용한 것과 동일한 방식으로 만들고 사용할 수 있습니다. 프로세스 풀은 작업을받을 준비가 된 사전 인스턴스화 및 유휴 프로세스 그룹으로 정의 할 수 있습니다. 많은 작업을 수행해야 할 때 모든 작업에 대해 새 프로세스를 인스턴스화하는 것보다 프로세스 풀을 만드는 것이 좋습니다.
Python 모듈 – Concurrent.futures
Python 표준 라이브러리에는 concurrent.futures. 이 모듈은 개발자에게 비동기 작업을 시작하기위한 고급 인터페이스를 제공하기 위해 Python 3.2에 추가되었습니다. 스레드 또는 프로세스 풀을 사용하여 작업을 실행하기위한 인터페이스를 제공하기위한 Python의 스레딩 및 다중 처리 모듈 상단에있는 추상화 계층입니다.
이후 섹션에서는 concurrent.futures 모듈의 여러 하위 클래스를 살펴볼 것입니다.
실행자 클래스
Executor 의 추상 클래스입니다 concurrent.futuresPython 모듈. 직접 사용할 수 없으며 다음 구체적인 하위 클래스 중 하나를 사용해야합니다.
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor – 구체적인 하위 클래스
Executor 클래스의 구체적인 하위 클래스 중 하나입니다. 다중 처리를 사용하고 작업 제출을위한 프로세스 풀을 얻습니다. 이 풀은 사용 가능한 프로세스에 작업을 할당하고 실행하도록 예약합니다.
ProcessPoolExecutor를 만드는 방법은 무엇입니까?
의 도움으로 concurrent.futures 모듈 및 구체적인 하위 클래스 Executor, 우리는 쉽게 프로세스 풀을 만들 수 있습니다. 이를 위해 우리는ProcessPoolExecutor풀에서 원하는 프로세스 수로 기본적으로 숫자는 5입니다. 그 다음에는 프로세스 풀에 작업을 제출합니다.
예
이제 스레드 풀을 만들 때 사용한 것과 동일한 예를 고려할 것입니다. 유일한 차이점은 ProcessPoolExecutor 대신에 ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
산출
False
False
Completed
위의 예에서 프로세스PoolExecutor5 개의 스레드로 구성되었습니다. 그런 다음 메시지를 제공하기 전에 2 초 동안 대기하는 작업이 프로세스 풀 실행기에 제출됩니다. 출력에서 볼 수 있듯이 작업은 2 초까지 완료되지 않으므로 첫 번째 호출은done()False를 반환합니다. 2 초 후에 작업이 완료되고 다음을 호출하여 미래의 결과를 얻습니다.result() 그것에 방법.
ProcessPoolExecutor 인스턴스화 – 컨텍스트 관리자
ProcessPoolExecutor를 인스턴스화하는 또 다른 방법은 컨텍스트 관리자를 사용하는 것입니다. 위의 예에서 사용 된 방법과 유사하게 작동합니다. 컨텍스트 관리자를 사용할 때의 주요 이점은 구문 적으로 좋아 보인다는 것입니다. 인스턴스화는 다음 코드를 사용하여 수행 할 수 있습니다.
with ProcessPoolExecutor(max_workers = 5) as executor
예
더 나은 이해를 위해 스레드 풀을 만들 때 사용한 것과 동일한 예제를 사용합니다. 이 예에서는 먼저concurrent.futures기준 치수. 그런 다음 이름이 지정된 함수load_url()요청 된 URL을로드 할 생성됩니다. 그만큼ProcessPoolExecutor그런 다음 풀에있는 5 개의 스레드로 생성됩니다. 과정PoolExecutor컨텍스트 관리자로 활용되었습니다. 우리는 다음을 호출하여 미래의 결과를 얻을 수 있습니다.result() 그것에 방법.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
산출
위의 Python 스크립트는 다음 출력을 생성합니다.
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Executor.map () 함수 사용
파이썬 map()기능은 많은 작업을 수행하는 데 널리 사용됩니다. 이러한 작업 중 하나는 iterables 내의 모든 요소에 특정 함수를 적용하는 것입니다. 마찬가지로 반복기의 모든 요소를 함수에 매핑하고이를 독립 작업으로 제출할 수 있습니다.ProcessPoolExecutor. 이를 이해하려면 다음 Python 스크립트 예제를 고려하십시오.
예
다음을 사용하여 스레드 풀을 만들 때 사용한 것과 동일한 예를 고려합니다. Executor.map()함수. 아래 주어진 예에서는 map 함수를 사용하여square() 값 배열의 모든 값에 함수를 추가합니다.
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
산출
위의 Python 스크립트는 다음 출력을 생성합니다.
4
9
16
25
ProcessPoolExecutor 및 ThreadPoolExecutor는 언제 사용합니까?
이제 Executor 클래스 (ThreadPoolExecutor 및 ProcessPoolExecutor) 모두에 대해 공부 했으므로 언제 어떤 실행기를 사용해야하는지 알아야합니다. CPU 바운드 워크로드의 경우 ProcessPoolExecutor를 선택하고 I / O 바운드 워크로드의 경우 ThreadPoolExecutor를 선택해야합니다.
우리가 사용한다면 ProcessPoolExecutor, 그러면 다중 처리를 사용하기 때문에 GIL에 대해 걱정할 필요가 없습니다. 또한 실행 시간은ThreadPoolExecution. 이를 이해하려면 다음 Python 스크립트 예제를 고려하십시오.
예
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
산출
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
산출
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
위의 두 프로그램의 출력에서 우리는 사용하는 동안 실행 시간의 차이를 볼 수 있습니다. ProcessPoolExecutor 과 ThreadPoolExecutor.