Menggunakan beberapa pekerja dalam tugas latar belakang - Fast-API

Aug 17 2020

Saya mencoba memproses file yang diunggah oleh pengguna. Namun, saya ingin pengguna mendapatkan respons setelah unggahan selesai dan mengakhiri koneksi tetapi terus memproses file. Oleh karena itu saya menggunakan BackgroundTasks.add_tasks dan kode saya terlihat seperti ini:

class Line(BaseModel):
    line: str

@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""

    ...

    result = ... # processing line.line
    print(results)
    return results

@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):

    background_tasks.add_task(process, csv)
    return response.text("CSV has been uploaded successfully")


async def process(csv):
    """ Processing CSV and generate data"""

    tasks = [foo(line) for line in csv]
    result = await asyncio.gather(*tasks)

Sayangnya, kode di atas hanya dijalankan satu per satu. Selain itu, saya harus menunggu sampai semua hasil diproses dan kemudian pernyataan cetak di foo berfungsi, misalkan saya memiliki n baris di csv, setelah semua n diproses adalah ketika saya melihat pernyataan cetak untuk semua. Program saya berjalan pada 20 pekerja tetapi saat proses ini berjalan, itu hanya menggunakan sekitar 1% dari CPU (foo bukan tugas komputasi, ini lebih merupakan tugas terikat IO / Jaringan). Ini membuat saya berpikir bahwa proses latar belakang hanya berjalan pada 1 pekerja. Saya mencoba ProcessPoolExecutor sebagai berikut:

loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
    results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
    results = loop.run_until_complete(*results)

Namun, saya mendapatkan kesalahan berikut:

processpoolexecutor tidak bisa membuat acar objek lokal

Saya berhasil mengatasi kesalahan itu dengan mengubah pendekatan saya dari:

results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]

untuk:

results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]

Namun, kemudian saya mendapatkan kesalahan ini:

File "uvloop / loop.pyx", baris 2658, di uvloop.loop.Loop.run_in_executor AttributeError: Objek 'Loop' tidak memiliki atribut 'submit'

Ringkasnya: Untuk memproses satu baris, saya dapat menekan titik akhir "/ foo" . Sekarang, saya ingin memproses csv 200 baris. Jadi pertama-tama saya menerima file dari pengguna dan mengembalikan pesan sukses dan mengakhiri koneksi itu. Csv kemudian ditambahkan ke tugas latar belakang yang harus memetakan setiap baris ke titik akhir "/ foo" dan memberi saya hasil untuk setiap baris. Namun, semua pendekatan yang saya coba sejauh ini tampaknya hanya menggunakan satu utas dan memproses setiap baris satu per satu. Saya ingin pendekatan di mana saya dapat memproses banyak baris bersama-sama, hampir seolah-olah saya mencapai titik akhir "/ foo" beberapa kali secara bersamaan seperti kita dapat menggunakan alat seperti Apache JMeter.

Jawaban

1 alex_noname Aug 18 2020 at 10:29

Anda dapat melakukan pemrosesan secara paralel tanpa menggunakan titik akhir. Di bawah ini adalah contoh yang disederhanakan (tanpa menggunakan footitik akhir) berdasarkan kode Anda:

import asyncio
import sys
import uvicorn
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from loguru import logger


logger.remove()
logger.add(sys.stdout, colorize=True, format="<green>{time:HH:mm:ss}</green> | {level} | <level>{message}</level>")

app = FastAPI()


async def async_io_bound(line: str):
    await asyncio.sleep(3)  # Pretend this is IO operations
    return f"Line '{line}' processed"


async def process(csv):
    """ Processing CSV and generate data"""
    tasks = [async_io_bound(line) for line in csv]
    logger.info("start processing")
    result = await asyncio.gather(*tasks)
    for i in result:
        logger.info(i)


@app.post("/upload-to-process")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
    background_tasks.add_task(process, csv.file)
    return {"result": "CSV has been uploaded successfully"}

if __name__ == "__main__":
    uvicorn.run("app3:app", host="localhost", port=8001)

Contoh keluaran (semua baris diproses secara paralel):

INFO:     ::1:52358 - "POST /upload-to-process HTTP/1.1" 200 OK
13:21:31 | INFO | start processing
13:21:34 | INFO | Line 'b'one, two\n'' processed
13:21:34 | INFO | Line 'b'0, 1\n'' processed
13:21:34 | INFO | Line 'b'1, 1\n'' processed
13:21:34 | INFO | Line 'b'2, 1\n'' processed
13:21:34 | INFO | Line 'b'3, 1\n'' processed
13:21:34 | INFO | Line 'b'4, 1\n'' processed
13:21:34 | INFO | Line 'b'5, 1\n'' processed
13:21:34 | INFO | Line 'b'6, 1\n'' processed
13:21:34 | INFO | Line 'b'7, 1\n'' processed
13:21:34 | INFO | Line 'b'8, 1\n'' processed
13:21:34 | INFO | Line 'b'9, 1\n'' processed