Использование нескольких воркеров в фоновой задаче - Fast-API
Я пытаюсь обработать файл, загруженный пользователем. Однако я хочу, чтобы пользователь получил ответ после завершения загрузки и разорвал соединение, но продолжил обработку файла. Поэтому я использую BackgroundTasks.add_tasks, и мой код выглядит примерно так:
class Line(BaseModel):
line: str
@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""
...
result = ... # processing line.line
print(results)
return results
@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
background_tasks.add_task(process, csv)
return response.text("CSV has been uploaded successfully")
async def process(csv):
""" Processing CSV and generate data"""
tasks = [foo(line) for line in csv]
result = await asyncio.gather(*tasks)
К сожалению, приведенный выше код выполняется только по очереди. Более того, мне нужно подождать, пока все результаты будут обработаны, а затем сработает оператор печати в foo , т.е. скажем, у меня есть n строк в csv, после того, как все n обработаны, это когда я вижу операторы печати для всех. Моя программа работает с 20 рабочими, но пока этот процесс работает, он использует только около 1% ЦП (foo не является вычислительной задачей, это скорее задача, связанная с вводом-выводом / сетью). Это наводит меня на мысль, что фоновый процесс работает только на 1 работнике. Я пробовал ProcessPoolExecutor следующим образом:
loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
results = loop.run_until_complete(*results)
Однако я получаю следующую ошибку:
processpoolexecutor не может обработать локальный объект
Мне удалось преодолеть эту ошибку, изменив свой подход с:
results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
кому:
results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]
Однако затем я получаю эту ошибку:
Файл "uvloop / loop.pyx", строка 2658, в uvloop.loop.Loop.run_in_executor AttributeError: объект 'Loop' не имеет атрибута 'submit'
Подводя итог: чтобы обработать одну строку, я могу попасть в конечную точку «/ foo» . Теперь я хочу обработать CSV из 200 строк. Итак, сначала я принимаю файл от пользователя, возвращаю сообщение об успешном завершении и разрываю это соединение. Затем csv добавляется к фоновой задаче, которая должна сопоставлять каждую строку с конечной точкой «/ foo» и давать мне результаты для каждой строки. Однако все подходы, которые я пробовал до сих пор, похоже, используют только один поток и обрабатывают каждую строку одну за другой. Мне нужен подход, при котором я мог бы обрабатывать несколько строк вместе, как если бы я несколько раз одновременно нажимал на конечную точку «/ foo», как мы можем использовать такие инструменты, как Apache JMeter.
Ответы
Вы можете выполнять обработку параллельно без использования конечной точки. Ниже приведен упрощенный пример (без использования foo
конечной точки) на основе вашего кода:
import asyncio
import sys
import uvicorn
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from loguru import logger
logger.remove()
logger.add(sys.stdout, colorize=True, format="<green>{time:HH:mm:ss}</green> | {level} | <level>{message}</level>")
app = FastAPI()
async def async_io_bound(line: str):
await asyncio.sleep(3) # Pretend this is IO operations
return f"Line '{line}' processed"
async def process(csv):
""" Processing CSV and generate data"""
tasks = [async_io_bound(line) for line in csv]
logger.info("start processing")
result = await asyncio.gather(*tasks)
for i in result:
logger.info(i)
@app.post("/upload-to-process")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
background_tasks.add_task(process, csv.file)
return {"result": "CSV has been uploaded successfully"}
if __name__ == "__main__":
uvicorn.run("app3:app", host="localhost", port=8001)
Пример вывода (все строки обрабатывались параллельно):
INFO: ::1:52358 - "POST /upload-to-process HTTP/1.1" 200 OK
13:21:31 | INFO | start processing
13:21:34 | INFO | Line 'b'one, two\n'' processed
13:21:34 | INFO | Line 'b'0, 1\n'' processed
13:21:34 | INFO | Line 'b'1, 1\n'' processed
13:21:34 | INFO | Line 'b'2, 1\n'' processed
13:21:34 | INFO | Line 'b'3, 1\n'' processed
13:21:34 | INFO | Line 'b'4, 1\n'' processed
13:21:34 | INFO | Line 'b'5, 1\n'' processed
13:21:34 | INFO | Line 'b'6, 1\n'' processed
13:21:34 | INFO | Line 'b'7, 1\n'' processed
13:21:34 | INFO | Line 'b'8, 1\n'' processed
13:21:34 | INFO | Line 'b'9, 1\n'' processed