¿Por qué mi TPL Dataflow Pipeline es más lento para leer archivos CSV de gran tamaño en comparación con los bucles?

Nov 25 2020

Entonces, mi requisito es leer varios archivos CSV (cada uno con un mínimo de un millón de filas) y luego analizar cada línea. Actualmente, de la forma en que he dividido mi canalización, primero estoy creando una canalización separada para leer un archivo CSV en una cadena [] y luego planeo crear la canalización de análisis más tarde.

Pero al ver los resultados de mi canalización de lectura de archivos, me quedo estupefacto porque es considerablemente más lento que simplemente recorrer el archivo CSV y luego recorrer las filas.

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

Y luego finalmente lo consumo de la siguiente manera

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

Mientras que mi código de bucle directo es el siguiente:

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

La diferencia en el rendimiento se reduce a ~ 15 segundos para TPL Dataflow, mientras que es ~ 5 segundos para el código de bucle.

EDITAR

Siguiendo un mejor consejo de los comentarios, eliminé el lineBufferBlock innecesario de la canalización y este es mi código ahora. Sin embargo, el rendimiento sigue siendo el mismo.

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

Respuestas

TheodorZoulias Nov 26 2020 at 00:41

Cuando configura una canalización, debe tener en cuenta las capacidades del hardware que va a hacer el trabajo. El flujo de datos de TPL no está haciendo el trabajo por sí solo, lo está delegando en la CPU, el HDD / SSD, la tarjeta de red, etc. 8 archivos al mismo tiempo, porque el cabezal del brazo mecánico del HDD no se puede ubicar físicamente en 8 lugares al mismo tiempo. Esto se reduce al hecho de que leer archivos de sistemas de archivos no es particularmente compatible con el paralelo. Es un poco mejor en el caso de los SSD, pero tendrá que probarlo caso por caso.

Otro problema con la paralelización es la granularidad. Desea que la carga de trabajo sea gruesa, no granular. De lo contrario, el costo de pasar mensajes de búfer a búfer y colocar barreras de memoria alrededor de cada transferencia para garantizar la visibilidad entre subprocesos puede anular cualquier beneficio que pueda esperar del empleo del paralelismo. Consejo: dividir un solo stringen partes es una operación muy granular.

He aquí una forma de hacerlo:

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

Este ejemplo utiliza el Batchoperador del MoreLinqpaquete para pasar las líneas en lotes de 100, en lugar de pasarlas una por una. Puede encontrar otras opciones de procesamiento por lotes aquí .


Actualización: una sugerencia más es aumentar la cantidad mínima de subprocesos que ThreadPoolcrea bajo demanda ( SetMinThreads). De lo contrario ThreadPool, la MaxDegreeOfParallelism = Environment.ProcessorCountconfiguración lo saturará inmediatamente , lo que provocará retrasos pequeños pero notables (500 mseg), debido a la pereza intencional del ThreadPoolalgoritmo de inyección de subprocesos.

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

Basta con llamar a este método una vez al inicio del programa.