Использование нескольких воркеров в фоновой задаче - Fast-API

Aug 17 2020

Я пытаюсь обработать файл, загруженный пользователем. Однако я хочу, чтобы пользователь получил ответ после завершения загрузки и разорвал соединение, но продолжил обработку файла. Поэтому я использую BackgroundTasks.add_tasks, и мой код выглядит примерно так:

class Line(BaseModel):
    line: str

@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""

    ...

    result = ... # processing line.line
    print(results)
    return results

@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):

    background_tasks.add_task(process, csv)
    return response.text("CSV has been uploaded successfully")


async def process(csv):
    """ Processing CSV and generate data"""

    tasks = [foo(line) for line in csv]
    result = await asyncio.gather(*tasks)

К сожалению, приведенный выше код выполняется только по очереди. Более того, мне нужно подождать, пока все результаты будут обработаны, а затем сработает оператор печати в foo , т.е. скажем, у меня есть n строк в csv, после того, как все n обработаны, это когда я вижу операторы печати для всех. Моя программа работает с 20 рабочими, но пока этот процесс работает, он использует только около 1% ЦП (foo не является вычислительной задачей, это скорее задача, связанная с вводом-выводом / сетью). Это наводит меня на мысль, что фоновый процесс работает только на 1 работнике. Я пробовал ProcessPoolExecutor следующим образом:

loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
    results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
    results = loop.run_until_complete(*results)

Однако я получаю следующую ошибку:

processpoolexecutor не может обработать локальный объект

Мне удалось преодолеть эту ошибку, изменив свой подход с:

results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]

кому:

results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]

Однако затем я получаю эту ошибку:

Файл "uvloop / loop.pyx", строка 2658, в uvloop.loop.Loop.run_in_executor AttributeError: объект 'Loop' не имеет атрибута 'submit'

Подводя итог: чтобы обработать одну строку, я могу попасть в конечную точку «/ foo» . Теперь я хочу обработать CSV из 200 строк. Итак, сначала я принимаю файл от пользователя, возвращаю сообщение об успешном завершении и разрываю это соединение. Затем csv добавляется к фоновой задаче, которая должна сопоставлять каждую строку с конечной точкой «/ foo» и давать мне результаты для каждой строки. Однако все подходы, которые я пробовал до сих пор, похоже, используют только один поток и обрабатывают каждую строку одну за другой. Мне нужен подход, при котором я мог бы обрабатывать несколько строк вместе, как если бы я несколько раз одновременно нажимал на конечную точку «/ foo», как мы можем использовать такие инструменты, как Apache JMeter.

Ответы

1 alex_noname Aug 18 2020 at 10:29

Вы можете выполнять обработку параллельно без использования конечной точки. Ниже приведен упрощенный пример (без использования fooконечной точки) на основе вашего кода:

import asyncio
import sys
import uvicorn
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from loguru import logger


logger.remove()
logger.add(sys.stdout, colorize=True, format="<green>{time:HH:mm:ss}</green> | {level} | <level>{message}</level>")

app = FastAPI()


async def async_io_bound(line: str):
    await asyncio.sleep(3)  # Pretend this is IO operations
    return f"Line '{line}' processed"


async def process(csv):
    """ Processing CSV and generate data"""
    tasks = [async_io_bound(line) for line in csv]
    logger.info("start processing")
    result = await asyncio.gather(*tasks)
    for i in result:
        logger.info(i)


@app.post("/upload-to-process")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
    background_tasks.add_task(process, csv.file)
    return {"result": "CSV has been uploaded successfully"}

if __name__ == "__main__":
    uvicorn.run("app3:app", host="localhost", port=8001)

Пример вывода (все строки обрабатывались параллельно):

INFO:     ::1:52358 - "POST /upload-to-process HTTP/1.1" 200 OK
13:21:31 | INFO | start processing
13:21:34 | INFO | Line 'b'one, two\n'' processed
13:21:34 | INFO | Line 'b'0, 1\n'' processed
13:21:34 | INFO | Line 'b'1, 1\n'' processed
13:21:34 | INFO | Line 'b'2, 1\n'' processed
13:21:34 | INFO | Line 'b'3, 1\n'' processed
13:21:34 | INFO | Line 'b'4, 1\n'' processed
13:21:34 | INFO | Line 'b'5, 1\n'' processed
13:21:34 | INFO | Line 'b'6, 1\n'' processed
13:21:34 | INFO | Line 'b'7, 1\n'' processed
13:21:34 | INFO | Line 'b'8, 1\n'' processed
13:21:34 | INFO | Line 'b'9, 1\n'' processed