Jak przesyłać postęp operacji w aplikacji FastAPI?
Wdrożyłem punkt końcowy fastapi,
from fastapi import FastAPI, UploadFile
from typing import List
app = FastAPI()
@app.post('/work/test')
async def testing(files: List(UploadFile)):
for i in files:
.......
# do a lot of operations on each file
# after than I am just writing that processed data into mysql database
# cur.execute(...)
# cur.commit()
.......
# just returning "OK" to confirm data is written into mysql
return {"response" : "OK"}
Mogę zażądać danych wyjściowych z punktu końcowego API i działa dobrze dla mnie.
Teraz największym wyzwaniem dla mnie jest wiedzieć, ile czasu zajmuje każda iteracja. Ponieważ w części UI (ci, którzy uzyskują dostęp do mojego punktu końcowego API) chcę im pomóc w wyświetlaniu paska postępu (CZAS ZABRONIONY) dla każdej przetwarzanej iteracji / pliku.
Czy jest jakikolwiek sposób, aby to osiągnąć? Jeśli tak, pomóż mi, jak mogę dalej postępować?
Dziękuję Ci.
Odpowiedzi
Poniżej znajduje się rozwiązanie wykorzystujące unikalne identyfikatory oraz dostępny globalnie słownik przechowujący informacje o ofertach pracy:
UWAGA: Poniższy kod jest bezpieczny w użyciu, dopóki nie użyjesz wartości kluczy dynamicznych (w używanym przykładowym identyfikatorze użytkownika) i nie utrzymasz aplikacji w ramach jednego procesu.
- Aby uruchomić aplikację, utwórz plik
main.py
- Biegać
uvicorn main:app --reload
- Utwórz pozycję pracy, uzyskując dostęp
http://127.0.0.1:8000/
- Powtórz krok 3, aby utworzyć wiele zadań
- Przejdź do
http://127.0.0.1/status
strony, aby zobaczyć statusy stron. - Idź do,
http://127.0.0.1/status/{identifier}
aby zobaczyć postęp pracy według identyfikatora pracy.
Kod aplikacji:
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status')
def status():
return {
'all': list(context['jobs'].values()),
}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
Podejścia
Ankieta
Najbardziej preferowanym sposobem śledzenia postępów w zadaniu jest sondowanie:
- Po otrzymaniu prośby o
request
rozpoczęcie zadania na zapleczu:- Utwórz
task object
w pamięci (np. W pamięciredis
itp.).task object
Musi zawierać następujące dane:task ID
,status
(w toku, zakończone),result
i innych. - Wykonywania zadania w tle (współprogram, gwintowanie wieloprocesorowe, kolejka zadania, takie jak Celery, arq, aio-pika, dramatiqitp)
- Odpowiedz natychmiast na odpowiedź
202 (Accepted)
zwracając otrzymaną wcześniejtask ID
.
- Utwórz
- Zaktualizuj stan zadania:
- Może to pochodzić z samego zadania, jeśli wie o magazynie zadań i ma do niego dostęp. Samo zadanie okresowo aktualizuje informacje o sobie.
- Lub używać monitora zadania (
Observer
,producer-consumer
wzorzec), który będzie monitorować status zadania i jego wynik. Zaktualizuje również informacje w pamięci.
- Na
client side
(front-end
) rozpocznij cykl sondowania dla statusu zadania do punktu końcowego/task/{ID}/status
, który pobiera informacje z magazynu zadań.
Odpowiedź strumieniowa
Przesyłanie strumieniowe jest mniej wygodnym sposobem okresowego sprawdzania stanu przetwarzania żądań. Kiedy stopniowo wciskamy odpowiedzi bez zamykania połączenia. Ma szereg istotnych wad, na przykład w przypadku zerwania połączenia można utracić informacje. Streaming Api to inne podejście niż REST Api.
Websockets
Możesz także używać websockets do powiadomień w czasie rzeczywistym i komunikacji dwukierunkowej.
Spinki do mankietów:
- Przykłady podejścia do odpytywania paska postępu oraz bardziej szczegółowy opis
django + celery
można znaleźć pod tymi linkami:
https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
- Podałem tutaj uproszczone przykłady uruchamiania zadań w tle w FastAPI przy użyciu wieloprocesowości:
https://stackoverflow.com/a/63171013/13782669
Stara odpowiedź:
Możesz uruchomić zadanie w tle, zwrócić je id
i podać /status
punkt końcowy, który front będzie okresowo wywoływał. W odpowiedzi dotyczącej statusu możesz zwrócić aktualny stan zadania (na przykład oczekujące z numerem aktualnie przetwarzanego pliku). Podałem tutaj kilka prostych przykładów .
Próbny
Ankieta
Demo podejścia wykorzystującego zadania asyncio (rozwiązanie dla pojedynczego pracownika):
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {} # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Przykład dostosowany do pętli z pytania
Funkcja przetwarzania w tle jest zdefiniowana jako def
i FastAPI uruchamia ją w puli wątków.
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] = {}
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Streaming
async def process_files_gen(files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
yield f"{i.filename} processed\n"
yield f"OK\n"
@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
return StreamingResponse(process_files_gen(files))