Por que meu TPL Dataflow Pipeline é mais lento na leitura de arquivos CSV enormes em comparação com apenas em loop?

Nov 25 2020

Portanto, meu requisito é ler vários arquivos CSV (cada um com um mínimo de um milhão de linhas) e, em seguida, analisar cada linha. Atualmente, da maneira como desmembramos meu pipeline, estou criando primeiro um pipeline separado para apenas ler um arquivo CSV em uma string [] e, em seguida, planejo criar o pipeline de análise mais tarde.

Mas, vendo os resultados do meu Pipeline de leitura de arquivos, fico perplexo porque é consideravelmente mais lento do que apenas fazer um loop no arquivo CSV e, em seguida, fazer um loop nas linhas.

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

E então eu finalmente consumi da seguinte maneira

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

Enquanto meu código de loop direto é o seguinte:

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

A diferença no desempenho cai para ~ 15 segundos para o TPL Dataflow, enquanto é ~ 5s para o código de loop.

EDITAR

Seguindo um conselho melhor dos comentários, removi o lineBufferBlock desnecessário do pipeline e este é meu código agora. No entanto, o desempenho ainda permanece o mesmo.

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

Respostas

TheodorZoulias Nov 26 2020 at 00:41

Ao configurar um pipeline, você deve ter em mente os recursos do hardware que fará o trabalho. O TPL Dataflow não está fazendo o trabalho sozinho, ele está delegando-o à CPU, ao HDD / SSD, à placa de rede etc. Por exemplo, ao ler arquivos de um disco rígido, provavelmente é inútil instruir o TPL a ler dados de 8 arquivos simultaneamente, porque a cabeça do braço mecânico do HDD não pode estar fisicamente localizada em 8 lugares ao mesmo tempo. Isso se resume ao fato de que a leitura de arquivos de sistemas de arquivos não é particularmente amigável em paralelo. É um pouco melhor no caso de SSDs, mas você terá que testá-lo caso a caso.

Outro problema com a paralelização é a granularidade. Você deseja que a carga de trabalho seja grande, não granular. Caso contrário, o custo de passar mensagens de buffer para buffer e colocar barreiras de memória em cada transferência para garantir a visibilidade cross-thread pode anular quaisquer benefícios que você possa esperar do emprego do paralelismo. Dica: dividir um stringem partes é uma operação altamente granular.

Aqui está uma maneira de fazer isso:

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

Este exemplo usa o Batchoperador do MoreLinqpacote para passar as linhas em lotes de 100, em vez de passá-las uma por uma. Você pode encontrar outras opções de envio em lote aqui .


Update: Mais uma sugestão é aumentar o número mínimo de threads que o ThreadPoolcria on demand ( SetMinThreads). Caso contrário, o ThreadPoolserá imediatamente saturado pela MaxDegreeOfParallelism = Environment.ProcessorCountconfiguração, o que causará atrasos pequenos, mas perceptíveis (500 mseg), devido à preguiça intencional do ThreadPoolalgoritmo de injeção de thread.

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

Basta chamar este método uma vez no início do programa.