Jak przesyłać postęp operacji w aplikacji FastAPI?

Nov 19 2020

Wdrożyłem punkt końcowy fastapi,

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}

Mogę zażądać danych wyjściowych z punktu końcowego API i działa dobrze dla mnie.

Teraz największym wyzwaniem dla mnie jest wiedzieć, ile czasu zajmuje każda iteracja. Ponieważ w części UI (ci, którzy uzyskują dostęp do mojego punktu końcowego API) chcę im pomóc w wyświetlaniu paska postępu (CZAS ZABRONIONY) dla każdej przetwarzanej iteracji / pliku.

Czy jest jakikolwiek sposób, aby to osiągnąć? Jeśli tak, pomóż mi, jak mogę dalej postępować?

Dziękuję Ci.

Odpowiedzi

2 AndriyIvaneyko Nov 28 2020 at 01:31

Poniżej znajduje się rozwiązanie wykorzystujące unikalne identyfikatory oraz dostępny globalnie słownik przechowujący informacje o ofertach pracy:

UWAGA: Poniższy kod jest bezpieczny w użyciu, dopóki nie użyjesz wartości kluczy dynamicznych (w używanym przykładowym identyfikatorze użytkownika) i nie utrzymasz aplikacji w ramach jednego procesu.

  1. Aby uruchomić aplikację, utwórz plik main.py
  2. Biegać uvicorn main:app --reload
  3. Utwórz pozycję pracy, uzyskując dostęp http://127.0.0.1:8000/
  4. Powtórz krok 3, aby utworzyć wiele zadań
  5. Przejdź do http://127.0.0.1/statusstrony, aby zobaczyć statusy stron.
  6. Idź do, http://127.0.0.1/status/{identifier}aby zobaczyć postęp pracy według identyfikatora pracy.

Kod aplikacji:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = {'jobs': {}}

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}

@app.get('/status')
def status():
    return {
        'all': list(context['jobs'].values()),
    }

@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

3 alex_noname Nov 19 2020 at 18:41

Podejścia

Ankieta

Najbardziej preferowanym sposobem śledzenia postępów w zadaniu jest sondowanie:

  1. Po otrzymaniu prośby o requestrozpoczęcie zadania na zapleczu:
    1. Utwórz task objectw pamięci (np. W pamięci redisitp.). task objectMusi zawierać następujące dane: task ID, status(w toku, zakończone), resulti innych.
    2. Wykonywania zadania w tle (współprogram, gwintowanie wieloprocesorowe, kolejka zadania, takie jak Celery, arq, aio-pika, dramatiqitp)
    3. Odpowiedz natychmiast na odpowiedź 202 (Accepted)zwracając otrzymaną wcześniej task ID.
  2. Zaktualizuj stan zadania:
    1. Może to pochodzić z samego zadania, jeśli wie o magazynie zadań i ma do niego dostęp. Samo zadanie okresowo aktualizuje informacje o sobie.
    2. Lub używać monitora zadania ( Observer, producer-consumerwzorzec), który będzie monitorować status zadania i jego wynik. Zaktualizuje również informacje w pamięci.
  3. Na client side( front-end) rozpocznij cykl sondowania dla statusu zadania do punktu końcowego /task/{ID}/status, który pobiera informacje z magazynu zadań.

Odpowiedź strumieniowa

Przesyłanie strumieniowe jest mniej wygodnym sposobem okresowego sprawdzania stanu przetwarzania żądań. Kiedy stopniowo wciskamy odpowiedzi bez zamykania połączenia. Ma szereg istotnych wad, na przykład w przypadku zerwania połączenia można utracić informacje. Streaming Api to inne podejście niż REST Api.

Websockets

Możesz także używać websockets do powiadomień w czasie rzeczywistym i komunikacji dwukierunkowej.

Spinki do mankietów:

  • Przykłady podejścia do odpytywania paska postępu oraz bardziej szczegółowy opis django + celerymożna znaleźć pod tymi linkami:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • Podałem tutaj uproszczone przykłady uruchamiania zadań w tle w FastAPI przy użyciu wieloprocesowości:

https://stackoverflow.com/a/63171013/13782669

Stara odpowiedź:

Możesz uruchomić zadanie w tle, zwrócić je idi podać /statuspunkt końcowy, który front będzie okresowo wywoływał. W odpowiedzi dotyczącej statusu możesz zwrócić aktualny stan zadania (na przykład oczekujące z numerem aktualnie przetwarzanego pliku). Podałem tutaj kilka prostych przykładów .

Próbny

Ankieta

Demo podejścia wykorzystującego zadania asyncio (rozwiązanie dla pojedynczego pracownika):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Przykład dostosowany do pętli z pytania

Funkcja przetwarzania w tle jest zdefiniowana jako defi FastAPI uruchamia ją w puli wątków.

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Streaming

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"{i.filename} processed\n"
    yield f"OK\n"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))