Python 3 asyncio con aioboto3 sembra sequenziale

Aug 28 2020

Sto portando un semplice script python 3 su AWS Lambda. Lo script è semplice: raccoglie informazioni da una dozzina di oggetti S3 e restituisce i risultati.

Lo script utilizzato multiprocessing.Poolper raccogliere tutti i file in parallelo. Sebbene multiprocessingnon possa essere utilizzato in un ambiente AWS Lambda poiché /dev/shmmanca. Quindi ho pensato invece di scrivere uno sporco multiprocessing.Process/ multiprocessing.Queuesostitutivo, avrei provato asyncioinvece.

Sto usando l'ultima versione di aioboto3(8.0.5) su Python 3.8.

Il mio problema è che non riesco a ottenere alcun miglioramento tra un download sequenziale ingenuo dei file e un loop di eventi asincrono che multiplexing i download.

Ecco le due versioni del mio codice.

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',
    [...]
    'some/key/10',
]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET, Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcessPoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())

I tempi per entrambi run_sequential()e run_concurrent()sono abbastanza simili (~ 3 secondi per una dozzina di file da 10 MB). Sono convinto che la versione concorrente non lo sia, per molteplici ragioni:

  • Ho provato a passare a Process/ThreadPoolExecutore ho generato i processi / thread per la durata della funzione, anche se non stanno facendo nulla
  • Il tempo tra sequenziale e simultaneo è molto vicino allo stesso, anche se la mia interfaccia di rete non è decisamente satura e nemmeno la CPU è vincolata
  • Il tempo impiegato dalla versione concorrente aumenta linearmente con il numero di file.

Sono sicuro che manchi qualcosa, ma non riesco a capire cosa.

Qualche idea?

Risposte

NewbiZ Aug 28 2020 at 11:50

Dopo aver perso alcune ore cercando di capire come usare aioboto3correttamente, ho deciso di passare alla mia soluzione di backup. Ho finito per lanciare la mia versione ingenua di multiprocessing.Poolda utilizzare all'interno di un ambiente AWS lambda.

Se qualcuno dovesse imbattersi in questo thread in futuro, eccolo qui. È tutt'altro che perfetto, ma abbastanza facile da sostituire così multiprocessing.Poolcom'è per i miei casi semplici.

from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self, process_count=1):
        assert process_count >= 1
        self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe, index, func):
        def wrapper(args):
            try:
                result = func(args)
            except Exception as exc:  # pylint: disable=broad-except
                result = exc
            pipe.send((index, result))
        return wrapper

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def map(self, function, arguments):
        pending = list(enumerate(arguments))
        running = []
        finished = [None] * len(pending)
        while pending or running:
            # Fill the running queue with new jobs
            while len(running) < self.process_count:
                if not pending:
                    break
                index, args = pending.pop(0)
                pipe_parent, pipe_child = Pipe(False)
                process = Process(
                    target=Pool.wrap_pipe(pipe_child, index, function),
                    args=(args, ))
                process.start()
                running.append((index, process, pipe_parent))
            # Wait for jobs to finish
            for pipe in wait(list(map(lambda t: t[2], running))):
                index, result = pipe.recv()
                # Remove the finished job from the running list
                running = list(filter(lambda x: x[0] != index, running))
                # Add the result to the finished list
                finished[index] = result

        return finished