Класс с несколькими рабочими
Я создаю инструмент, который взаимодействует с пакетным потоком входящих данных. Эти данные нужно обработать и вернуть результат. Чтобы разделить работу, я создал класс с очередями inbound ( _in
) и outbound ( out
) и рабочими, которые получают, обрабатывают и депонируют работу.
В этом примере берется итерация чисел (in pass_data
) и они умножаются на f
.
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
На данный момент это проверка концепции. Есть ли в этом методе подводные камни? Есть уже лучший способ сделать это ?!
Аспекты, которые я собираюсь затронуть:
- ограничения размера очереди
- ограничение числа потоков
Ответы
Вот некоторые наблюдения и моменты, которые следует учитывать.
Вы уверены, что вам нужна многопроцессорность или потоки? В вопросе нет информации, чтобы сказать, зачем они могут понадобиться. Есть накладные расходы на их использование. Возможно, цикла ввода-вычисления-вывода будет достаточно.
Ожидаете ли вы, что пропускная способность программы будет ограничена вводом-выводом или обработкой ЦП. Общее эмпирическое правило - использовать потоки или asynchio
для первого и процессы для последующего.
Имеет ли значение, что результаты не могут быть возвращены в том же порядке, в котором они были отправлены? Нужно ли на них указывать время?
threads
вводит в заблуждение имя параметра при использовании процессов.
Текущий основной код помещает элементы во входную очередь и получает элементы из выходной очереди. Если очереди имеют ограниченные размеры, будет возможно зайти в тупик, если основной код заблокирован при добавлении в полную очередь ввода, а рабочим заблокировано добавление в полную очередь вывода.
Использовать multiprocessing.Pool
В multiprocessing
библиотеке уже есть реализация рабочего пула, готовая к использованию. Ваш код можно переписать как:
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
Хотя multiprocessing.Pool
позволяет вам проверить, готовы ли результаты, используя .ready()
результат вызова apply_async()
или map_async()
, вы не можете получить частичный результат из map_async()
. Однако, если вы действительно хотите обрабатывать отдельные результаты, как только они будут готовы, вы можете рассмотреть возможность вызова apply_async()
с помощью функции обратного вызова, которая обрабатывает результат.