여러 작업자가있는 클래스
들어오는 데이터의 일괄 처리 스트림과 상호 작용하는 도구를 만들고 있습니다. 이 데이터를 처리하고 결과를 반환해야합니다. 작업을 분할하기 위해 인바운드 ( _in
) 및 아웃 바운드 ( out
) 대기열과 작업을 가져오고, 처리하고, 보관하는 작업자 가있는 클래스를 만들었습니다 .
이 예제에서는 반복 가능한 숫자 (in pass_data
)를 가져와 f
.
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
이것은 현재 개념 증명입니다. 이 방법에 함정이 있습니까? 이 작업을 수행하는 더 좋은 방법이 이미 있습니까?!
내가 다루고 자하는 측면 :
- 대기열 크기 제한
- 스레드 수 제한
답변
다음은 몇 가지 관찰 및 고려해야 할 사항입니다.
다중 처리 또는 스레드가 필요합니까? 질문에 왜 필요한지에 대한 정보가 없습니다. 그것들을 사용하기위한 오버 헤드가 있습니다. 아마도 입력-계산-출력 루프로 충분할 것입니다.
프로그램의 처리량이 IO 또는 CPU 처리에 의해 제한 될 것으로 예상하십니까? 경험의 일반적인 규칙은 스레드 또는 asynchio
전자 를 사용 하고 나중에 프로세스를 사용하는 것입니다.
결과가 제출 된 순서대로 반환되지 않는 것이 중요합니까? 타임 스탬프가 필요합니까?
threads
프로세스를 사용할 때 혼동되는 매개 변수 이름입니다.
현재 기본 코드는 항목을 입력 대기열에 넣고 출력 대기열에서 항목을 가져옵니다. 큐의 크기가 제한된 경우 전체 입력 큐에 추가 할 때 기본 코드가 차단되고 작업자가 전체 출력 큐에 추가되지 않도록 차단되면 교착 상태가 될 수 있습니다.
사용하다 multiprocessing.Pool
multiprocessing
라이브러리는 이미이 작업자 풀 사용할 준비가 구현. 코드는 다음과 같이 다시 작성할 수 있습니다.
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
또는 호출 의 결과를 multiprocessing.Pool
사용하여 결과가 준비되었는지 확인할 수 있지만 에서 부분 결과를 얻을 수는 없습니다 . 그러나 개별 결과가 준비되는 즉시 처리하려면 결과를 처리하는 콜백 함수로 호출 하는 것을 고려할 수 있습니다..ready()
apply_async()
map_async()
map_async()
apply_async()