Pythonでの並行性-プロセスのプール
プロセスのプールは、スレッドのプールを作成して使用したのと同じ方法で作成して使用できます。プロセスプールは、事前にインスタンス化されたアイドル状態のプロセスのグループとして定義できます。これらのプロセスは、作業を開始する準備ができています。多数のタスクを実行する必要がある場合は、すべてのタスクに対して新しいプロセスをインスタンス化するよりも、プロセスプールを作成することをお勧めします。
Pythonモジュール– Concurrent.futures
Python標準ライブラリには、 concurrent.futures。このモジュールは、非同期タスクを起動するための高レベルのインターフェースを開発者に提供するためにPython3.2で追加されました。これは、スレッドまたはプロセスのプールを使用してタスクを実行するためのインターフェイスを提供するための、Pythonのスレッド化およびマルチプロセッシングモジュールの上部にある抽象化レイヤーです。
以降のセクションでは、concurrent.futuresモジュールのさまざまなサブクラスについて説明します。
エグゼキュータクラス
Executor の抽象クラスです concurrent.futuresPythonモジュール。直接使用することはできず、次の具体的なサブクラスのいずれかを使用する必要があります-
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor –具体的なサブクラス
これは、Executorクラスの具体的なサブクラスの1つです。マルチプロセッシングを使用し、タスクを送信するためのプロセスのプールを取得します。このプールは、使用可能なプロセスにタスクを割り当て、実行するようにスケジュールします。
ProcessPoolExecutorを作成する方法は?
の助けを借りて concurrent.futures モジュールとその具体的なサブクラス Executor、プロセスのプールを簡単に作成できます。このために、私たちは構築する必要がありますProcessPoolExecutorプールに必要なプロセスの数で。デフォルトでは、番号は5です。これに続いて、プロセスプールにタスクが送信されます。
例
ここで、スレッドプールの作成時に使用したのと同じ例を検討しますが、唯一の違いは、次に使用することです。 ProcessPoolExecutor の代わりに ThreadPoolExecutor 。
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
出力
False
False
Completed
上記の例では、プロセスPoolExecutor5つのスレッドで構成されています。次に、メッセージを送信する前に2秒間待機するタスクが、プロセスプールエグゼキュータに送信されます。出力からわかるように、タスクは2秒まで完了しないため、最初の呼び出しはdone()Falseを返します。2秒後、タスクが完了し、を呼び出すことで将来の結果を取得しますresult() その上でメソッド。
ProcessPoolExecutorのインスタンス化–コンテキストマネージャー
ProcessPoolExecutorをインスタンス化する別の方法は、コンテキストマネージャーを使用することです。上記の例で使用した方法と同様に機能します。コンテキストマネージャを使用する主な利点は、構文的に見栄えがよいことです。インスタンス化は、次のコードを使用して実行できます-
with ProcessPoolExecutor(max_workers = 5) as executor
例
理解を深めるために、スレッドプールの作成時に使用したのと同じ例を取り上げています。この例では、まずインポートする必要がありますconcurrent.futuresモジュール。次に、という名前の関数load_url()要求されたURLをロードするが作成されます。ザ・ProcessPoolExecutor次に、プール内の5つのスレッド数で作成されます。プロセスPoolExecutorコンテキストマネージャーとして利用されています。と呼ぶことで未来の結果を得ることができますresult() その上でメソッド。
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
出力
上記のPythonスクリプトは、次の出力を生成します-
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Executor.map()関数の使用
Python map()関数は、多くのタスクを実行するために広く使用されています。そのようなタスクの1つは、反復可能オブジェクト内のすべての要素に特定の関数を適用することです。同様に、イテレータのすべての要素を関数にマップし、これらを独立したジョブとしてProcessPoolExecutor。これを理解するために、次のPythonスクリプトの例を検討してください。
例
を使用してスレッドプールを作成するときに使用したのと同じ例を検討します。 Executor.map()関数。以下の例では、map関数を使用して適用していますsquare() 値配列内のすべての値に対して関数。
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
出力
上記のPythonスクリプトは、次の出力を生成します
4
9
16
25
ProcessPoolExecutorとThreadPoolExecutorをいつ使用するのですか?
ThreadPoolExecutorとProcessPoolExecutorの両方のエグゼキュータークラスについて学習したので、どちらのエグゼキューターをいつ使用するかを知る必要があります。CPUバウンドワークロードの場合はProcessPoolExecutorを選択し、I / Oバウンドワークロードの場合はThreadPoolExecutorを選択する必要があります。
使用する場合 ProcessPoolExecutor、マルチプロセッシングを使用しているため、GILについて心配する必要はありません。さらに、実行時間は比較すると短くなりますThreadPoolExecution。これを理解するには、次のPythonスクリプトの例を検討してください。
例
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
出力
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
出力
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
上記の両方のプログラムの出力から、使用中の実行時間の違いを確認できます。 ProcessPoolExecutor そして ThreadPoolExecutor。