Comment envoyer une progression d'opération dans une application FastAPI?
J'ai déployé un endpoint fastapi,
from fastapi import FastAPI, UploadFile
from typing import List
app = FastAPI()
@app.post('/work/test')
async def testing(files: List(UploadFile)):
for i in files:
.......
# do a lot of operations on each file
# after than I am just writing that processed data into mysql database
# cur.execute(...)
# cur.commit()
.......
# just returning "OK" to confirm data is written into mysql
return {"response" : "OK"}
Je peux demander la sortie du point de terminaison de l'API et cela fonctionne parfaitement pour moi.
Maintenant, le plus grand défi pour moi de savoir combien de temps cela prend pour chaque itération. Parce que dans la partie UI (ceux qui accèdent à mon point de terminaison API), je veux les aider à afficher une barre de progression (TIME TAKEN) pour chaque itération / fichier en cours de traitement.
Y a-t-il un moyen possible pour moi d'y parvenir? Si tel est le cas, aidez-moi à savoir comment puis-je continuer?
Je vous remercie.
Réponses
Voici une solution qui utilise des identifiants uniq et un dictionnaire disponible dans le monde entier qui contient des informations sur les travaux:
REMARQUE: le code ci-dessous peut être utilisé en toute sécurité jusqu'à ce que vous utilisiez des valeurs de clés dynamiques (dans l'exemple d'uuid utilisé) et que vous conserviez l'application dans un seul processus.
- Pour démarrer l'application, créez un fichier
main.py
- Courir
uvicorn main:app --reload
- Créer une entrée d'emploi en accédant
http://127.0.0.1:8000/
- Répétez l'étape 3 pour créer plusieurs emplois
- Accédez à la
http://127.0.0.1/status
page pour voir les états des pages. - Accédez à
http://127.0.0.1/status/{identifier}
pour voir la progression de la tâche par l'ID de la tâche.
Code de l'application:
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status')
def status():
return {
'all': list(context['jobs'].values()),
}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
Approches
Vote
L'approche la plus préférée pour suivre la progression d'une tâche est l'interrogation:
- Après avoir reçu un
request
pour démarrer une tâche sur un backend:- Créez un
task object
dans le stockage (par exemple en mémoire,redis
etc.). Letask object
doit contenir les données suivantes:task ID
,status
( en cours, terminé),result
et d' autres. - Exécuter la tâche en arrière - plan (de coroutines, filetage, multitraitement, file d' attente des tâches comme Celery, arq, aio-pika, dramatiqet etc.)
- Répondez immédiatement à la réponse
202 (Accepted)
en renvoyant le reçu précédemmenttask ID
.
- Créez un
- Mettre à jour l'état de la tâche:
- Cela peut provenir de la tâche elle-même, s'il connaît le magasin de tâches et y a accès. Périodiquement, la tâche elle-même met à jour les informations la concernant.
- Ou utilisez un moniteur de tâches (
Observer
,producer-consumer
modèle), qui surveillera l'état de la tâche et son résultat. Et il mettra également à jour les informations dans le stockage.
- Sur le
client side
(front-end
), lancez un cycle d'interrogation de l'état de la tâche vers le point de terminaison/task/{ID}/status
, qui prend les informations du stockage de la tâche.
Réponse en streaming
Le streaming est un moyen moins pratique d'obtenir périodiquement l'état du traitement des demandes. Lorsque nous poussons progressivement les réponses sans fermer la connexion. Il présente un certain nombre d'inconvénients importants, par exemple, si la connexion est interrompue, vous pouvez perdre des informations. Streaming Api est une autre approche que REST Api.
Websockets
Vous pouvez également utiliser des Websockets pour les notifications en temps réel et la communication bidirectionnelle.
Liens:
- Des exemples d'approche d'interrogation pour la barre de progression et une description plus détaillée de
django + celery
peuvent être trouvés sur ces liens:
https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
- J'ai fourni des exemples simplifiés d'exécution de tâches d'arrière-plan dans FastAPI à l'aide du multitraitement ici:
https://stackoverflow.com/a/63171013/13782669
Ancienne réponse:
Vous pouvez exécuter une tâche en arrière-plan, la renvoyer id
et fournir un /status
point de terminaison que le front appellerait périodiquement. Dans la réponse d'état, vous pouvez renvoyer l'état actuel de votre tâche (par exemple, en attente avec le numéro du fichier actuellement traité). J'ai fourni quelques exemples simples ici .
Démo
Vote
Démo de l'approche utilisant des tâches asyncio (solution à un seul travailleur):
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {} # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Exemple adapté pour boucle à partir de question
La fonction de traitement en arrière-plan est définie comme def
et FastAPI l'exécute sur le pool de threads.
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] = {}
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Diffusion
async def process_files_gen(files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
yield f"{i.filename} processed\n"
yield f"OK\n"
@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
return StreamingResponse(process_files_gen(files))