여러 작업자가있는 클래스

Aug 19 2020

들어오는 데이터의 일괄 처리 스트림과 상호 작용하는 도구를 만들고 있습니다. 이 데이터를 처리하고 결과를 반환해야합니다. 작업을 분할하기 위해 인바운드 ( _in) 및 아웃 바운드 ( out) 대기열과 작업을 가져오고, 처리하고, 보관하는 작업자 가있는 클래스를 만들었습니다 .

이 예제에서는 반복 가능한 숫자 (in pass_data)를 가져와 f.

import queue, random, time
from multiprocessing import Process, Queue

def _worker(_in, out, f):
    """Get work from _in and output processed data to out"""
    while True:
        try:
            work = _in.get()
        except queue.Empty:
            continue
        # simulate blocking for some time
        time.sleep(random.uniform(0.01, 0.5))
        out.put(work * f)

class C:
    def __init__(self, f, threads=2):
        self.f = f
        self.threads = threads
        self._in, self.out = Queue(), Queue()
        self.args = (self._in, self.out, self.f)
        self.workers = [
            Process(target=_worker, args=self.args) for _ in range(self.threads)
        ]

    def __repr__(self):
        return f"{self.__class__.__name__}(threads={self.threads})"

    def start(self):
        """Start all workers"""
        for worker in self.workers:
            worker.start()

    def terminate(self):
        """Terminate all workers"""
        for worker in self.workers:
            worker.terminate()

    def pass_data(self, data):
        """Pass data to the queue to be processed"""
        for rec in data:
            self._in.put(rec)

    def get_completed(self):
        """Return a list of processed data"""
        items = []
        while True:
            try:
                items.append(self.out.get_nowait())
            except queue.Empty:
                break
        return items

if __name__ == "__main__":
    c = C(f=12, threads=2)
    c.start()

    for i in range(5):
        s = 0
        n = random.randint(1, 20)
        c.pass_data(list(range(n)))
        print(f"sent: {n}")
        while s < n:
            r = c.get_completed()
            s += len(r)
            if r:
                print(len(r), end=", ")
            time.sleep(random.uniform(0.01, 0.4))
        print()
    c.terminate()

이것은 현재 개념 증명입니다. 이 방법에 함정이 있습니까? 이 작업을 수행하는 더 좋은 방법이 이미 있습니까?!

내가 다루고 자하는 측면 :

  • 대기열 크기 제한
  • 스레드 수 제한

답변

4 RootTwo Sep 02 2020 at 01:44

다음은 몇 가지 관찰 및 고려해야 할 사항입니다.

다중 처리 또는 스레드가 필요합니까? 질문에 왜 필요한지에 대한 정보가 없습니다. 그것들을 사용하기위한 오버 헤드가 있습니다. 아마도 입력-계산-출력 루프로 충분할 것입니다.

프로그램의 처리량이 IO 또는 CPU 처리에 의해 제한 될 것으로 예상하십니까? 경험의 일반적인 규칙은 스레드 또는 asynchio전자 를 사용 하고 나중에 프로세스를 사용하는 것입니다.

결과가 제출 된 순서대로 반환되지 않는 것이 중요합니까? 타임 스탬프가 필요합니까?

threads 프로세스를 사용할 때 혼동되는 매개 변수 이름입니다.

현재 기본 코드는 항목을 입력 대기열에 넣고 출력 대기열에서 항목을 가져옵니다. 큐의 크기가 제한된 경우 전체 입력 큐에 추가 할 때 기본 코드가 차단되고 작업자가 전체 출력 큐에 추가되지 않도록 차단되면 교착 상태가 될 수 있습니다.

5 G.Sliepen Sep 02 2020 at 01:24

사용하다 multiprocessing.Pool

multiprocessing라이브러리는 이미이 작업자 풀 사용할 준비가 구현. 코드는 다음과 같이 다시 작성할 수 있습니다.

import time
from multiprocessing import Pool

def f(x):
    time.sleep(random.uniform(0.01, 0.5))
    return x * 12

if __name__ == "__main__":
    c = Pool(2)

    for i in range(5):
        n = random.randint(1, 20)
        r = c.map_async(f, list(range(n)))
        print(f"sent: {n}")
        print(f"got: {len(r.get())}")

또는 호출 의 결과를 multiprocessing.Pool사용하여 결과가 준비되었는지 확인할 수 있지만 에서 부분 결과를 얻을 수는 없습니다 . 그러나 개별 결과가 준비되는 즉시 처리하려면 결과를 처리하는 콜백 함수로 호출 하는 것을 고려할 수 있습니다..ready()apply_async()map_async()map_async()apply_async()