Wie sende ich einen Betriebsfortschritt in einer FastAPI-App?
Ich habe einen Fastapi-Endpunkt bereitgestellt.
from fastapi import FastAPI, UploadFile
from typing import List
app = FastAPI()
@app.post('/work/test')
async def testing(files: List(UploadFile)):
for i in files:
.......
# do a lot of operations on each file
# after than I am just writing that processed data into mysql database
# cur.execute(...)
# cur.commit()
.......
# just returning "OK" to confirm data is written into mysql
return {"response" : "OK"}
Ich kann die Ausgabe vom API-Endpunkt anfordern und es funktioniert perfekt für mich.
Nun ist es für mich die größte Herausforderung zu wissen, wie viel Zeit für jede Iteration benötigt wird. Weil ich im UI-Teil (diejenigen, die auf meinen API-Endpunkt zugreifen) ihnen helfen möchte, für jede Iteration / Datei, die verarbeitet wird, einen Fortschrittsbalken (TIME TAKEN) anzuzeigen.
Gibt es eine Möglichkeit für mich, dies zu erreichen? Wenn ja, helfen Sie mir bitte, wie ich weiter vorgehen kann.
Vielen Dank.
Antworten
Im Folgenden finden Sie eine Lösung, die eindeutige Bezeichner und ein global verfügbares Wörterbuch verwendet, das Informationen zu den Jobs enthält:
HINWEIS: Der folgende Code kann sicher verwendet werden, bis Sie die Werte für dynamische Schlüssel verwenden (in der verwendeten Beispiel-UUID) und die Anwendung innerhalb eines einzelnen Prozesses beibehalten.
- Um die App zu starten, erstellen Sie eine Datei
main.py
- Lauf
uvicorn main:app --reload
- Erstellen Sie einen Jobeintrag, indem Sie auf zugreifen
http://127.0.0.1:8000/
- Wiederholen Sie Schritt 3, um mehrere Jobs zu erstellen
- Gehen Sie zur
http://127.0.0.1/status
Seite, um den Seitenstatus anzuzeigen. - Gehen Sie zu
http://127.0.0.1/status/{identifier}
, um den Fortschritt des Jobs anhand der Job-ID anzuzeigen.
Code der App:
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status')
def status():
return {
'all': list(context['jobs'].values()),
}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
Nähert sich
Umfragen
Der am meisten bevorzugte Ansatz, um den Fortschritt einer Aufgabe zu verfolgen, ist die Abfrage:
- Nach Erhalt von a
request
, um eine Aufgabe in einem Backend zu starten:- Erstellen Sie eine
task object
im Speicher (z. B. im Speicherredis
usw.). Dastask object
muss die folgenden Daten enthalten :task ID
,status
(ausstehend, abgeschlossen)result
und andere. - Führen Aufgabe im Hintergrund (Koroutinen, Gewindeschneiden, Multiprocessing, Task - Warteschlange wie Celery, arq, aio-pika, dramatiqund etc.)
- Antwort sofort die Antwort
202 (Accepted)
durch Rücksendung der zuvor empfangenentask ID
.
- Erstellen Sie eine
- Aufgabenstatus aktualisieren:
- Dies kann innerhalb der Aufgabe selbst erfolgen, wenn sie den Aufgabenspeicher kennt und Zugriff darauf hat. In regelmäßigen Abständen aktualisiert die Aufgabe selbst Informationen über sich selbst.
- Oder verwenden Sie einen Aufgabenmonitor (
Observer
,producer-consumer
Muster), der den Status der Aufgabe und ihr Ergebnis überwacht. Außerdem werden die Informationen im Speicher aktualisiert.
- Starten Sie am
client side
(front-end
) einen Abfragezyklus für den Taskstatus zum Endpunkt/task/{ID}/status
, der Informationen aus dem Taskspeicher entnimmt.
Streaming-Antwort
Streaming ist eine weniger bequeme Methode, um den Status der Anforderungsverarbeitung regelmäßig abzurufen. Wenn wir nach und nach Antworten senden, ohne die Verbindung zu schließen. Es hat eine Reihe von erheblichen Nachteilen. Wenn beispielsweise die Verbindung unterbrochen wird, können Sie Informationen verlieren. Streaming Api ist ein anderer Ansatz als REST Api.
Websockets
Sie können Websockets auch für Echtzeitbenachrichtigungen und bidirektionale Kommunikation verwenden.
Links:
- Beispiele für den Abfrageansatz für den Fortschrittsbalken und eine detailliertere Beschreibung für
django + celery
finden Sie unter diesen Links:
https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
- Ich habe hier vereinfachte Beispiele für die Ausführung von Hintergrundaufgaben in FastAPI mithilfe von Multiprocessing bereitgestellt:
https://stackoverflow.com/a/63171013/13782669
Alte Antwort:
Sie können eine Aufgabe im Hintergrund ausführen, sie zurückgeben id
und einen /status
Endpunkt bereitstellen , den die Front regelmäßig aufruft. In der Statusantwort können Sie den Status Ihrer Aufgabe zurückgeben (z. B. ausstehend mit der Nummer der aktuell verarbeiteten Datei). Ich zur Verfügung gestellt , ein paar einfachen Beispiele hier .
Demo
Umfragen
Demo des Ansatzes mit Asyncio-Tasks (Single-Worker-Lösung):
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {} # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Angepasstes Beispiel für Schleife aus Frage
Die Hintergrundverarbeitungsfunktion ist definiert als def
und wird von FastAPI im Thread-Pool ausgeführt.
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] = {}
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Streaming
async def process_files_gen(files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
yield f"{i.filename} processed\n"
yield f"OK\n"
@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
return StreamingResponse(process_files_gen(files))