Comment envoyer une progression d'opération dans une application FastAPI?

Nov 19 2020

J'ai déployé un endpoint fastapi,

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}

Je peux demander la sortie du point de terminaison de l'API et cela fonctionne parfaitement pour moi.

Maintenant, le plus grand défi pour moi de savoir combien de temps cela prend pour chaque itération. Parce que dans la partie UI (ceux qui accèdent à mon point de terminaison API), je veux les aider à afficher une barre de progression (TIME TAKEN) pour chaque itération / fichier en cours de traitement.

Y a-t-il un moyen possible pour moi d'y parvenir? Si tel est le cas, aidez-moi à savoir comment puis-je continuer?

Je vous remercie.

Réponses

2 AndriyIvaneyko Nov 28 2020 at 01:31

Voici une solution qui utilise des identifiants uniq et un dictionnaire disponible dans le monde entier qui contient des informations sur les travaux:

REMARQUE: le code ci-dessous peut être utilisé en toute sécurité jusqu'à ce que vous utilisiez des valeurs de clés dynamiques (dans l'exemple d'uuid utilisé) et que vous conserviez l'application dans un seul processus.

  1. Pour démarrer l'application, créez un fichier main.py
  2. Courir uvicorn main:app --reload
  3. Créer une entrée d'emploi en accédant http://127.0.0.1:8000/
  4. Répétez l'étape 3 pour créer plusieurs emplois
  5. Accédez à la http://127.0.0.1/statuspage pour voir les états des pages.
  6. Accédez à http://127.0.0.1/status/{identifier}pour voir la progression de la tâche par l'ID de la tâche.

Code de l'application:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = {'jobs': {}}

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}

@app.get('/status')
def status():
    return {
        'all': list(context['jobs'].values()),
    }

@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

3 alex_noname Nov 19 2020 at 18:41

Approches

Vote

L'approche la plus préférée pour suivre la progression d'une tâche est l'interrogation:

  1. Après avoir reçu un requestpour démarrer une tâche sur un backend:
    1. Créez un task objectdans le stockage (par exemple en mémoire, redisetc.). Le task objectdoit contenir les données suivantes: task ID, status( en cours, terminé), resultet d' autres.
    2. Exécuter la tâche en arrière - plan (de coroutines, filetage, multitraitement, file d' attente des tâches comme Celery, arq, aio-pika, dramatiqet etc.)
    3. Répondez immédiatement à la réponse 202 (Accepted)en renvoyant le reçu précédemment task ID.
  2. Mettre à jour l'état de la tâche:
    1. Cela peut provenir de la tâche elle-même, s'il connaît le magasin de tâches et y a accès. Périodiquement, la tâche elle-même met à jour les informations la concernant.
    2. Ou utilisez un moniteur de tâches ( Observer, producer-consumermodèle), qui surveillera l'état de la tâche et son résultat. Et il mettra également à jour les informations dans le stockage.
  3. Sur le client side( front-end), lancez un cycle d'interrogation de l'état de la tâche vers le point de terminaison /task/{ID}/status, qui prend les informations du stockage de la tâche.

Réponse en streaming

Le streaming est un moyen moins pratique d'obtenir périodiquement l'état du traitement des demandes. Lorsque nous poussons progressivement les réponses sans fermer la connexion. Il présente un certain nombre d'inconvénients importants, par exemple, si la connexion est interrompue, vous pouvez perdre des informations. Streaming Api est une autre approche que REST Api.

Websockets

Vous pouvez également utiliser des Websockets pour les notifications en temps réel et la communication bidirectionnelle.

Liens:

  • Des exemples d'approche d'interrogation pour la barre de progression et une description plus détaillée de django + celerypeuvent être trouvés sur ces liens:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • J'ai fourni des exemples simplifiés d'exécution de tâches d'arrière-plan dans FastAPI à l'aide du multitraitement ici:

https://stackoverflow.com/a/63171013/13782669

Ancienne réponse:

Vous pouvez exécuter une tâche en arrière-plan, la renvoyer idet fournir un /statuspoint de terminaison que le front appellerait périodiquement. Dans la réponse d'état, vous pouvez renvoyer l'état actuel de votre tâche (par exemple, en attente avec le numéro du fichier actuellement traité). J'ai fourni quelques exemples simples ici .

Démo

Vote

Démo de l'approche utilisant des tâches asyncio (solution à un seul travailleur):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Exemple adapté pour boucle à partir de question

La fonction de traitement en arrière-plan est définie comme defet FastAPI l'exécute sur le pool de threads.

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Diffusion

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"{i.filename} processed\n"
    yield f"OK\n"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))