एक पृष्ठभूमि कार्य में कई श्रमिकों का उपयोग करना - फास्ट-एपीआई
मैं उपयोगकर्ता द्वारा अपलोड की गई फ़ाइल को संसाधित करने का प्रयास कर रहा हूं। हालाँकि, मैं चाहता हूं कि अपलोड पूरा होने के बाद उपयोगकर्ता को एक प्रतिक्रिया मिले और कनेक्शन समाप्त हो जाए, लेकिन फ़ाइल को संसाधित करना जारी रखें। इसलिए मैं BackgroundTasks.add_tasks का उपयोग कर रहा हूं और मेरा कोड कुछ इस तरह दिखता है:
class Line(BaseModel):
line: str
@app.post("/foo")
async def foo(line: Line):
""" Processing line generate results"""
...
result = ... # processing line.line
print(results)
return results
@app.post("/upload")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
background_tasks.add_task(process, csv)
return response.text("CSV has been uploaded successfully")
async def process(csv):
""" Processing CSV and generate data"""
tasks = [foo(line) for line in csv]
result = await asyncio.gather(*tasks)
दुर्भाग्य से, उपरोक्त कोड केवल एक-एक को निष्पादित करता है। इसके अलावा, मुझे तब तक इंतजार करना होगा जब तक सभी परिणाम संसाधित न हो जाएं और फिर फू कार्यों में स्टेटमेंट प्रिंट कर दें, यानी मान लीजिए कि मेरे पास सीएसवी में एन लाइनें हैं, सभी एन प्रोसेस होने के बाद जब मैं सभी के लिए प्रिंट स्टेटमेंट देखता हूं। मेरा कार्यक्रम 20 श्रमिकों पर चलता है, लेकिन जब यह प्रक्रिया चल रही होती है, तो यह केवल 1% CPU का उपयोग करता है (foo एक संगणना कार्य नहीं है, यह एक IO / Network बाध्य कार्य है)। इससे मुझे लगता है कि पृष्ठभूमि की प्रक्रिया केवल 1 कार्यकर्ता पर चल रही है। मैंने ProcessPoolExecutor को इस प्रकार आज़माया:
loop = asyncio.get_event_loop()
lines = [line_0, line_1, ..., line_n] # Extracted all lines from CSV
with ProcessPoolExecutor() as executor:
results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
results = loop.run_until_complete(*results)
हालाँकि, मुझे निम्न त्रुटि मिलती है:
processpoolexecutor स्थानीय ऑब्जेक्ट को अचार नहीं कर सकता है
मैंने अपना दृष्टिकोण बदलकर उस त्रुटि को प्राप्त करने का प्रबंधन किया:
results = [loop.run_in_executor(executor, lambda: foo(line)) for line in lines]
सेवा:
results = [asyncio.ensure_future(foo(line=Line(line)) for line in lines]
हालाँकि, तब मुझे यह त्रुटि मिलती है:
फ़ाइल "uvloop / loop.pyx", पंक्ति 2658, uvloop.loop.Loop.run_in_executor AttributeError में: 'लूप' ऑब्जेक्ट में कोई विशेषता 'सबमिट' नहीं है
संक्षेप में: एक पंक्ति को संसाधित करने के लिए, मैं "/ foo" समापन बिंदु हिट कर सकता हूं । अब, मैं 200 लाइनों की एक सीएसवी प्रक्रिया करना चाहता हूं। इसलिए पहले मैं उपयोगकर्ता से फ़ाइल स्वीकार करता हूं और एक सफलता संदेश वापस करता हूं और उस कनेक्शन को समाप्त करता हूं। फिर csv को एक बैकग्राउंड टास्क में जोड़ा जाता है, जिसे प्रत्येक लाइन को "/ foo" एंडपॉइंट पर मैप करना चाहिए और मुझे प्रत्येक लाइन के लिए परिणाम देना चाहिए। हालाँकि, मैंने अब तक जितने भी तरीके आज़माए हैं, वे केवल एक ही धागे का उपयोग कर रहे हैं और प्रत्येक पंक्ति को एक-एक करके संसाधित कर रहे हैं। मैं एक दृष्टिकोण चाहूंगा जहां मैं कई पंक्तियों को एक साथ संसाधित कर सकता हूं, लगभग जैसे कि मैं "/ foo" समापन बिंदु को एक साथ कई बार मार रहा हूं जैसे हम अपाचे जेमीटर जैसे उपकरणों का उपयोग कर सकते हैं।
जवाब
आप समापन बिंदु का उपयोग किए बिना समानांतर में प्रसंस्करण कर सकते हैं। foo
आपके कोड के आधार पर नीचे एक सरल उदाहरण दिया गया है ( समापन बिंदु का उपयोग किए बिना ):
import asyncio
import sys
import uvicorn
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from loguru import logger
logger.remove()
logger.add(sys.stdout, colorize=True, format="<green>{time:HH:mm:ss}</green> | {level} | <level>{message}</level>")
app = FastAPI()
async def async_io_bound(line: str):
await asyncio.sleep(3) # Pretend this is IO operations
return f"Line '{line}' processed"
async def process(csv):
""" Processing CSV and generate data"""
tasks = [async_io_bound(line) for line in csv]
logger.info("start processing")
result = await asyncio.gather(*tasks)
for i in result:
logger.info(i)
@app.post("/upload-to-process")
async def upload(background_tasks: BackgroundTasks, csv: UploadFile = File(...)):
background_tasks.add_task(process, csv.file)
return {"result": "CSV has been uploaded successfully"}
if __name__ == "__main__":
uvicorn.run("app3:app", host="localhost", port=8001)
आउटपुट का उदाहरण (सभी लाइनें समानांतर में संसाधित हुईं):
INFO: ::1:52358 - "POST /upload-to-process HTTP/1.1" 200 OK
13:21:31 | INFO | start processing
13:21:34 | INFO | Line 'b'one, two\n'' processed
13:21:34 | INFO | Line 'b'0, 1\n'' processed
13:21:34 | INFO | Line 'b'1, 1\n'' processed
13:21:34 | INFO | Line 'b'2, 1\n'' processed
13:21:34 | INFO | Line 'b'3, 1\n'' processed
13:21:34 | INFO | Line 'b'4, 1\n'' processed
13:21:34 | INFO | Line 'b'5, 1\n'' processed
13:21:34 | INFO | Line 'b'6, 1\n'' processed
13:21:34 | INFO | Line 'b'7, 1\n'' processed
13:21:34 | INFO | Line 'b'8, 1\n'' processed
13:21:34 | INFO | Line 'b'9, 1\n'' processed