複数の労働者によるクラス
着信データのバッチストリームと相互作用するツールを構築しています。このデータを処理して結果を返す必要があります。作業を分割するために、インバウンド_in
(out
)キューとアウトバウンド()キューを持つクラスと、作業を取得、処理、およびデポジットするワーカーを作成しました。
この例では、反復可能な数(in pass_data
)を取り、それらにf
。を掛けます。
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
これは、現時点では、概念実証です。この方法に落とし穴はありますか?すでにこれを行うためのより良い方法はありますか?!
私が取り組むつもりの側面:
- キューサイズの制限
- スレッド数の制限
回答
ここにいくつかの観察と考慮すべき事柄があります。
マルチプロセッシングまたはスレッドが必要ですか?質問には、なぜそれらが必要になるのかを説明する情報はありません。それらを使用するためのオーバーヘッドがあります。おそらく、入力-計算-出力ループで十分です。
プログラムのスループットがIOまたはCPU処理によって制限されると予想しますか。一般的な経験則ではasynchio
、前者にはスレッドを使用し、後者にはプロセスを使用します。
結果が提出されたのと同じ順序で返されない可能性があることは重要ですか?タイムスタンプを付ける必要がありますか?
threads
プロセスを使用するときの紛らわしいパラメータ名です。
現在のメインコードは、アイテムを入力キューに入れ、出力キューからアイテムを取得します。キューのサイズが制限されている場合、フル入力キューへの追加時にメインコードがブロックされ、ワーカーがフル出力キューへの追加がブロックされると、デッドロックが発生する可能性があります。
使用する multiprocessing.Pool
multiprocessing
ライブラリがすでに持っている労働者のプールを使用する準備ができて実装を。コードは次のように書き直すことができます。
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
一方でmultiprocessing.Pool
結果が使用して準備ができているかどうかをチェックすることを可能にする.ready()
結果にapply_async()
またはmap_async()
呼び出し、あなたがからの部分的な結果を得ることができませんmap_async()
。ただし、準備ができたらすぐに個々の結果を処理したい場合apply_async()
は、結果を処理するコールバック関数を使用して呼び出すことを検討できます。