Python 3 asyncio con aioboto3 sembra sequenziale
Sto portando un semplice script python 3 su AWS Lambda. Lo script è semplice: raccoglie informazioni da una dozzina di oggetti S3 e restituisce i risultati.
Lo script utilizzato multiprocessing.Poolper raccogliere tutti i file in parallelo. Sebbene multiprocessingnon possa essere utilizzato in un ambiente AWS Lambda poiché /dev/shmmanca. Quindi ho pensato invece di scrivere uno sporco multiprocessing.Process/ multiprocessing.Queuesostitutivo, avrei provato asyncioinvece.
Sto usando l'ultima versione di aioboto3(8.0.5) su Python 3.8.
Il mio problema è che non riesco a ottenere alcun miglioramento tra un download sequenziale ingenuo dei file e un loop di eventi asincrono che multiplexing i download.
Ecco le due versioni del mio codice.
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import boto3
import aioboto3
BUCKET = 'some-bucket'
KEYS = [
'some/key/1',
[...]
'some/key/10',
]
async def download_aio():
"""Concurrent download of all objects from S3"""
async with aioboto3.client('s3') as s3:
objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
objects = await asyncio.gather(*objects)
buffers = await asyncio.gather(*[o['Body'].read() for o in objects])
def download():
"""Sequentially download all objects from S3"""
s3 = boto3.client('s3')
for key in KEYS:
object = s3.get_object(Bucket=BUCKET, Key=key)
object['Body'].read()
def run_sequential():
download()
def run_concurrent():
loop = asyncio.get_event_loop()
#loop.set_default_executor(ProcessPoolExecutor(10))
#loop.set_default_executor(ThreadPoolExecutor(10))
loop.run_until_complete(download_aio())
I tempi per entrambi run_sequential()e run_concurrent()sono abbastanza simili (~ 3 secondi per una dozzina di file da 10 MB). Sono convinto che la versione concorrente non lo sia, per molteplici ragioni:
- Ho provato a passare a
Process/ThreadPoolExecutore ho generato i processi / thread per la durata della funzione, anche se non stanno facendo nulla - Il tempo tra sequenziale e simultaneo è molto vicino allo stesso, anche se la mia interfaccia di rete non è decisamente satura e nemmeno la CPU è vincolata
- Il tempo impiegato dalla versione concorrente aumenta linearmente con il numero di file.
Sono sicuro che manchi qualcosa, ma non riesco a capire cosa.
Qualche idea?
Risposte
Dopo aver perso alcune ore cercando di capire come usare aioboto3correttamente, ho deciso di passare alla mia soluzione di backup. Ho finito per lanciare la mia versione ingenua di multiprocessing.Poolda utilizzare all'interno di un ambiente AWS lambda.
Se qualcuno dovesse imbattersi in questo thread in futuro, eccolo qui. È tutt'altro che perfetto, ma abbastanza facile da sostituire così multiprocessing.Poolcom'è per i miei casi semplici.
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
class Pool:
"""Naive implementation of a process pool with mp.Pool API.
This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
is not mounted in an AWS Lambda environment.
"""
def __init__(self, process_count=1):
assert process_count >= 1
self.process_count = process_count
@staticmethod
def wrap_pipe(pipe, index, func):
def wrapper(args):
try:
result = func(args)
except Exception as exc: # pylint: disable=broad-except
result = exc
pipe.send((index, result))
return wrapper
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def map(self, function, arguments):
pending = list(enumerate(arguments))
running = []
finished = [None] * len(pending)
while pending or running:
# Fill the running queue with new jobs
while len(running) < self.process_count:
if not pending:
break
index, args = pending.pop(0)
pipe_parent, pipe_child = Pipe(False)
process = Process(
target=Pool.wrap_pipe(pipe_child, index, function),
args=(args, ))
process.start()
running.append((index, process, pipe_parent))
# Wait for jobs to finish
for pipe in wait(list(map(lambda t: t[2], running))):
index, result = pipe.recv()
# Remove the finished job from the running list
running = list(filter(lambda x: x[0] != index, running))
# Add the result to the finished list
finished[index] = result
return finished