Pythonでの並行性-スレッドのプール
マルチスレッドタスク用に多数のスレッドを作成する必要があるとします。スレッドが多すぎるためにパフォーマンスの問題が多数発生する可能性があるため、計算コストが最も高くなります。主な問題は、スループットが制限されることである可能性があります。スレッドのプールを作成することで、この問題を解決できます。スレッドプールは、事前にインスタンス化されたアイドル状態のスレッドのグループとして定義できます。これらのスレッドは、作業を開始する準備ができています。多数のタスクを実行する必要がある場合は、すべてのタスクに対して新しいスレッドをインスタンス化するよりも、スレッドプールを作成することをお勧めします。スレッドプールは、次のように多数のスレッドの同時実行を管理できます。
スレッドプール内のスレッドが実行を完了すると、そのスレッドを再利用できます。
スレッドが終了すると、そのスレッドを置き換えるために別のスレッドが作成されます。
Pythonモジュール– Concurrent.futures
Python標準ライブラリには concurrent.futuresモジュール。このモジュールは、非同期タスクを起動するための高レベルのインターフェースを開発者に提供するためにPython3.2で追加されました。これは、スレッドまたはプロセスのプールを使用してタスクを実行するためのインターフェイスを提供するための、Pythonのスレッド化およびマルチプロセッシングモジュールの上部にある抽象化レイヤーです。
以降のセクションでは、concurrent.futuresモジュールのさまざまなクラスについて学習します。
エグゼキュータクラス
Executorの抽象クラスです concurrent.futuresPythonモジュール。直接使用することはできず、次の具体的なサブクラスのいずれかを使用する必要があります-
- ThreadPoolExecutor
- ProcessPoolExecutor
ThreadPoolExecutor –具象サブクラス
これは、Executorクラスの具体的なサブクラスの1つです。サブクラスはマルチスレッドを使用し、タスクを送信するためのスレッドのプールを取得します。このプールは、使用可能なスレッドにタスクを割り当て、実行するようにスケジュールします。
ThreadPoolExecutorを作成する方法は?
の助けを借りて concurrent.futures モジュールとその具体的なサブクラス Executor、スレッドのプールを簡単に作成できます。このために、私たちは構築する必要がありますThreadPoolExecutorプールに必要なスレッドの数で。デフォルトでは、番号は5です。次に、タスクをスレッドプールに送信できます。私たちがsubmit() タスク、私たちは戻ってきます Future。Futureオブジェクトには、というメソッドがありますdone()、未来が解決したかどうかを示します。これにより、その特定の将来のオブジェクトに値が設定されました。タスクが終了すると、スレッドプールエグゼキュータは値をfutureオブジェクトに設定します。
例
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ThreadPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
出力
False
True
Completed
上記の例では、 ThreadPoolExecutor5つのスレッドで構成されています。次に、メッセージを送信する前に2秒間待機するタスクが、スレッドプールエグゼキュータに送信されます。出力からわかるように、タスクは2秒まで完了しないため、最初の呼び出しはdone()Falseを返します。2秒後、タスクが完了し、を呼び出すことで将来の結果を取得しますresult() その上でメソッド。
ThreadPoolExecutorのインスタンス化–コンテキストマネージャー
インスタンス化する別の方法 ThreadPoolExecutorコンテキストマネージャーの助けを借りています。上記の例で使用した方法と同様に機能します。コンテキストマネージャを使用する主な利点は、構文的に見栄えがよいことです。インスタンス化は、次のコードを使用して実行できます-
with ThreadPoolExecutor(max_workers = 5) as executor
例
次の例は、Pythonドキュメントから借用したものです。この例では、まず最初にconcurrent.futuresモジュールをインポートする必要があります。次に、という名前の関数load_url()要求されたURLをロードするが作成されます。次に、関数は作成しますThreadPoolExecutorプール内の5つのスレッドで。ザ・ThreadPoolExecutorコンテキストマネージャーとして利用されています。と呼ぶことで未来の結果を得ることができますresult() その上でメソッド。
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
出力
以下は、上記のPythonスクリプトの出力です-
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes
Executor.map()関数の使用
Python map()関数は多くのタスクで広く使用されています。そのようなタスクの1つは、反復可能オブジェクト内のすべての要素に特定の関数を適用することです。同様に、イテレータのすべての要素を関数にマップし、これらを独立したジョブとして送信できます。ThreadPoolExecutor。関数がどのように機能するかを理解するために、次のPythonスクリプトの例を検討してください。
例
以下のこの例では、map関数を使用して square() 値配列内のすべての値に対して関数。
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ThreadPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
出力
上記のPythonスクリプトは、次の出力を生成します-
4
9
16
25