FastAPI 앱에서 작업 진행 상황을 보내는 방법은 무엇입니까?
fastapi 엔드 포인트를 배포했습니다.
from fastapi import FastAPI, UploadFile
from typing import List
app = FastAPI()
@app.post('/work/test')
async def testing(files: List(UploadFile)):
for i in files:
.......
# do a lot of operations on each file
# after than I am just writing that processed data into mysql database
# cur.execute(...)
# cur.commit()
.......
# just returning "OK" to confirm data is written into mysql
return {"response" : "OK"}
API 끝점에서 출력을 요청할 수 있으며 완벽하게 작동합니다.
이제 각 반복에 걸리는 시간을 아는 것이 가장 큰 도전입니다. UI 부분 (내 API 엔드 포인트에 액세스하는 사용자)에서 처리중인 각 반복 / 파일에 대한 진행률 표시 줄 (TIME TAKEN)을 표시하도록 돕고 싶습니다.
내가 그것을 달성 할 수있는 방법이 있습니까? 그렇다면 더 진행할 수있는 방법을 알려주세요.
감사합니다.
답변
다음은 고유 식별자와 작업에 대한 정보를 보유하는 전역 적으로 사용 가능한 사전을 사용하는 솔루션입니다.
참고 : 아래 코드는 동적 키 값 (사용중인 샘플 uuid에서)을 사용하고 단일 프로세스 내에서 응용 프로그램을 유지할 때까지 사용하는 것이 안전합니다.
- 앱을 시작하려면 파일을 만듭니다.
main.py
- 운영
uvicorn main:app --reload
- 액세스하여 작업 항목 만들기
http://127.0.0.1:8000/
- 3 단계를 반복하여 여러 작업을 만듭니다.
http://127.0.0.1/status
페이지 상태를 보려면 페이지로 이동 하십시오.http://127.0.0.1/status/{identifier}
작업 ID 별 작업 진행 상황을 보려면로 이동 하십시오.
앱 코드 :
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status')
def status():
return {
'all': list(context['jobs'].values()),
}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
구혼
투표
작업 진행 상황을 추적하는 가장 선호되는 접근 방식은 폴링입니다.
request
백엔드에서 작업을 시작하기 위해 를 받은 후 :- 를 작성
task object
(예를 들어 메모리, 저장에redis
등). 는task object
다음과 같은 데이터를 포함해야합니다task ID
,status
(출원 완료)result
, 그리고 다른 사람을. - 백그라운드에서 실행 작업 (코 루틴, 스레딩, 멀티 프로세싱 같은 태스크 큐 Celery, arq, aio-pika, dramatiq등)
202 (Accepted)
이전에받은 응답을 즉시 반환하여 응답합니다task ID
.
- 를 작성
- 작업 상태 업데이트 :
- 작업 저장소에 대해 알고 있고 액세스 권한이있는 경우 작업 자체 내에서이 작업을 수행 할 수 있습니다. 주기적으로 작업 자체는 자체 정보를 업데이트합니다.
- 또는 작업 상태 및 결과를 모니터링하는 작업 모니터 (
Observer
,producer-consumer
패턴)를 사용합니다 . 또한 저장소의 정보도 업데이트합니다.
- 에
client side
(front-end
)를 시작 폴링주기 엔드 포인트에 대한 작업 상태에 대한/task/{ID}/status
작업 저장소에서 정보를 가져.
스트리밍 응답
스트리밍 은 주기적으로 요청 처리 상태를 가져 오는 덜 편리한 방법입니다. 연결을 끊지 않고 서서히 응답을 푸시 할 때. 예를 들어 연결이 끊어지면 정보가 손실 될 수 있습니다. 스트리밍 API는 REST Api와 다른 접근 방식입니다.
웹 소켓
실시간 알림 및 양방향 통신을 위해 웹 소켓 을 사용할 수도 있습니다 .
연결:
- 진행률 표시 줄에 대한 폴링 방식의 예와에 대한 자세한 설명
django + celery
은 다음 링크에서 찾을 수 있습니다.
https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
- 여기에서는 다중 처리를 사용하여 FastAPI에서 백그라운드 작업을 실행하는 간단한 예제를 제공했습니다.
https://stackoverflow.com/a/63171013/13782669
이전 답변 :
백그라운드에서 작업을 실행하고 작업을 반환하고 프런트가 주기적으로 호출 할 엔드 포인트를 id
제공 할 수 /status
있습니다. 상태 응답에서 현재 작업 상태를 반환 할 수 있습니다 (예 : 현재 처리 된 파일 번호로 보류 중). 여기에 몇 가지 간단한 예제를 제공했습니다 .
데모
투표
asyncio 작업을 사용한 접근 방식의 데모 (단일 작업자 솔루션) :
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {} # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
질문의 루프에 대한 수정 된 예
백그라운드 처리 기능은로 정의되며 def
FastAPI는 스레드 풀에서이를 실행합니다.
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] = {}
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
스트리밍
async def process_files_gen(files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
yield f"{i.filename} processed\n"
yield f"OK\n"
@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
return StreamingResponse(process_files_gen(files))