Utilisation de plusieurs travailleurs dans une tâche en arrière-plan - Fast-API

Aug 17 2020

J'essaye de traiter un fichier téléchargé par l'utilisateur. Cependant, je souhaite que l'utilisateur reçoive une réponse une fois le téléchargement terminé et qu'il mette fin à la connexion, tout en poursuivant le traitement du fichier. Par conséquent, j'utilise BackgroundTasks.add_tasks et mon code ressemble à ceci:

class Line(BaseModel):
    line: str

@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""

    ...

    result = ... # processing line.line
    print(results)
    return results

@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):

    background_tasks.add_task(process, csv)
    return response.text("CSV has been uploaded successfully")


async def process(csv):
    """ Processing CSV and generate data"""

    tasks = [foo(line) for line in csv]
    result = await asyncio.gather(*tasks)

Malheureusement, le code ci-dessus ne s'exécute qu'un par un. De plus, je dois attendre que tous les résultats soient traités, puis imprimer l'instruction dans foo fonctionne, c'est-à-dire que j'ai n lignes dans le csv, après tout, n sont traitées, c'est quand je vois les instructions d'impression pour tous. Mon programme fonctionne sur 20 travailleurs, mais pendant que ce processus est en cours d'exécution, il n'utilise qu'environ 1% du processeur (foo n'est pas une tâche de calcul, c'est plutôt une tâche liée aux E / S / réseau). Cela me fait penser que le processus d'arrière-plan s'exécute sur un seul travailleur. J'ai essayé ProcessPoolExecutor comme suit:

loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
    results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
    results = loop.run_until_complete(*results)

Cependant, j'obtiens l'erreur suivante:

processpoolexecutor ne peut pas pickle l'objet local

J'ai réussi à surmonter cette erreur en changeant mon approche de:

results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]

à:

results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]

Cependant, j'obtiens cette erreur:

Fichier "uvloop / loop.pyx", ligne 2658, dans uvloop.loop.Loop.run_in_executor AttributeError: L'objet 'Loop' n'a pas d'attribut 'submit'

Pour résumer: Pour traiter une ligne, je peux atteindre le point de terminaison "/ foo" . Maintenant, je veux traiter un csv de 200 lignes. J'accepte donc d'abord le fichier de l'utilisateur, je renvoie un message de réussite et je mets fin à cette connexion. Le csv est ensuite ajouté à une tâche d'arrière-plan qui doit mapper chaque ligne sur le point de terminaison "/ foo" et me donner les résultats pour chaque ligne. Cependant, toutes les approches que j'ai essayées jusqu'à présent semblent n'utiliser qu'un seul thread et traitent chaque ligne une par une. Je voudrais une approche où je peux traiter plusieurs lignes ensemble, presque comme si j'atteignais le point de terminaison "/ foo" plusieurs fois simultanément comme nous pouvons utiliser des outils comme Apache JMeter.

Réponses

1 alex_noname Aug 18 2020 at 10:29

Vous pouvez effectuer le traitement en parallèle sans utiliser de point de terminaison. Voici un exemple simplifié (sans utiliser de foopoint de terminaison) basé sur votre code:

import asyncio
import sys
import uvicorn
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from loguru import logger


logger.remove()
logger.add(sys.stdout, colorize=True, format="<green>{time:HH:mm:ss}</green> | {level} | <level>{message}</level>")

app = FastAPI()


async def async_io_bound(line: str):
    await asyncio.sleep(3)  # Pretend this is IO operations
    return f"Line '{line}' processed"


async def process(csv):
    """ Processing CSV and generate data"""
    tasks = [async_io_bound(line) for line in csv]
    logger.info("start processing")
    result = await asyncio.gather(*tasks)
    for i in result:
        logger.info(i)


@app.post("/upload-to-process")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
    background_tasks.add_task(process, csv.file)
    return {"result": "CSV has been uploaded successfully"}

if __name__ == "__main__":
    uvicorn.run("app3:app", host="localhost", port=8001)

Exemple de sortie (toutes les lignes ont été traitées en parallèle):

INFO:     ::1:52358 - "POST /upload-to-process HTTP/1.1" 200 OK
13:21:31 | INFO | start processing
13:21:34 | INFO | Line 'b'one, two\n'' processed
13:21:34 | INFO | Line 'b'0, 1\n'' processed
13:21:34 | INFO | Line 'b'1, 1\n'' processed
13:21:34 | INFO | Line 'b'2, 1\n'' processed
13:21:34 | INFO | Line 'b'3, 1\n'' processed
13:21:34 | INFO | Line 'b'4, 1\n'' processed
13:21:34 | INFO | Line 'b'5, 1\n'' processed
13:21:34 | INFO | Line 'b'6, 1\n'' processed
13:21:34 | INFO | Line 'b'7, 1\n'' processed
13:21:34 | INFO | Line 'b'8, 1\n'' processed
13:21:34 | INFO | Line 'b'9, 1\n'' processed