Wie sende ich einen Betriebsfortschritt in einer FastAPI-App?

Nov 19 2020

Ich habe einen Fastapi-Endpunkt bereitgestellt.

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}

Ich kann die Ausgabe vom API-Endpunkt anfordern und es funktioniert perfekt für mich.

Nun ist es für mich die größte Herausforderung zu wissen, wie viel Zeit für jede Iteration benötigt wird. Weil ich im UI-Teil (diejenigen, die auf meinen API-Endpunkt zugreifen) ihnen helfen möchte, für jede Iteration / Datei, die verarbeitet wird, einen Fortschrittsbalken (TIME TAKEN) anzuzeigen.

Gibt es eine Möglichkeit für mich, dies zu erreichen? Wenn ja, helfen Sie mir bitte, wie ich weiter vorgehen kann.

Vielen Dank.

Antworten

2 AndriyIvaneyko Nov 28 2020 at 01:31

Im Folgenden finden Sie eine Lösung, die eindeutige Bezeichner und ein global verfügbares Wörterbuch verwendet, das Informationen zu den Jobs enthält:

HINWEIS: Der folgende Code kann sicher verwendet werden, bis Sie die Werte für dynamische Schlüssel verwenden (in der verwendeten Beispiel-UUID) und die Anwendung innerhalb eines einzelnen Prozesses beibehalten.

  1. Um die App zu starten, erstellen Sie eine Datei main.py
  2. Lauf uvicorn main:app --reload
  3. Erstellen Sie einen Jobeintrag, indem Sie auf zugreifen http://127.0.0.1:8000/
  4. Wiederholen Sie Schritt 3, um mehrere Jobs zu erstellen
  5. Gehen Sie zur http://127.0.0.1/statusSeite, um den Seitenstatus anzuzeigen.
  6. Gehen Sie zu http://127.0.0.1/status/{identifier}, um den Fortschritt des Jobs anhand der Job-ID anzuzeigen.

Code der App:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = {'jobs': {}}

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}

@app.get('/status')
def status():
    return {
        'all': list(context['jobs'].values()),
    }

@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

3 alex_noname Nov 19 2020 at 18:41

Nähert sich

Umfragen

Der am meisten bevorzugte Ansatz, um den Fortschritt einer Aufgabe zu verfolgen, ist die Abfrage:

  1. Nach Erhalt von a request, um eine Aufgabe in einem Backend zu starten:
    1. Erstellen Sie eine task objectim Speicher (z. B. im Speicher redisusw.). Das task objectmuss die folgenden Daten enthalten : task ID, status(ausstehend, abgeschlossen) resultund andere.
    2. Führen Aufgabe im Hintergrund (Koroutinen, Gewindeschneiden, Multiprocessing, Task - Warteschlange wie Celery, arq, aio-pika, dramatiqund etc.)
    3. Antwort sofort die Antwort 202 (Accepted)durch Rücksendung der zuvor empfangenen task ID.
  2. Aufgabenstatus aktualisieren:
    1. Dies kann innerhalb der Aufgabe selbst erfolgen, wenn sie den Aufgabenspeicher kennt und Zugriff darauf hat. In regelmäßigen Abständen aktualisiert die Aufgabe selbst Informationen über sich selbst.
    2. Oder verwenden Sie einen Aufgabenmonitor ( Observer, producer-consumerMuster), der den Status der Aufgabe und ihr Ergebnis überwacht. Außerdem werden die Informationen im Speicher aktualisiert.
  3. Starten Sie am client side( front-end) einen Abfragezyklus für den Taskstatus zum Endpunkt /task/{ID}/status, der Informationen aus dem Taskspeicher entnimmt.

Streaming-Antwort

Streaming ist eine weniger bequeme Methode, um den Status der Anforderungsverarbeitung regelmäßig abzurufen. Wenn wir nach und nach Antworten senden, ohne die Verbindung zu schließen. Es hat eine Reihe von erheblichen Nachteilen. Wenn beispielsweise die Verbindung unterbrochen wird, können Sie Informationen verlieren. Streaming Api ist ein anderer Ansatz als REST Api.

Websockets

Sie können Websockets auch für Echtzeitbenachrichtigungen und bidirektionale Kommunikation verwenden.

Links:

  • Beispiele für den Abfrageansatz für den Fortschrittsbalken und eine detailliertere Beschreibung für django + celeryfinden Sie unter diesen Links:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • Ich habe hier vereinfachte Beispiele für die Ausführung von Hintergrundaufgaben in FastAPI mithilfe von Multiprocessing bereitgestellt:

https://stackoverflow.com/a/63171013/13782669

Alte Antwort:

Sie können eine Aufgabe im Hintergrund ausführen, sie zurückgeben idund einen /statusEndpunkt bereitstellen , den die Front regelmäßig aufruft. In der Statusantwort können Sie den Status Ihrer Aufgabe zurückgeben (z. B. ausstehend mit der Nummer der aktuell verarbeiteten Datei). Ich zur Verfügung gestellt , ein paar einfachen Beispiele hier .

Demo

Umfragen

Demo des Ansatzes mit Asyncio-Tasks (Single-Worker-Lösung):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Angepasstes Beispiel für Schleife aus Frage

Die Hintergrundverarbeitungsfunktion ist definiert als defund wird von FastAPI im Thread-Pool ausgeführt.

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Streaming

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"{i.filename} processed\n"
    yield f"OK\n"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))