Dlaczego mój potok przepływu danych TPL wolniej odczytuje duże pliki CSV w porównaniu do zwykłego zapętlania?

Nov 25 2020

Więc moim wymaganiem jest odczytanie wielu plików CSV (każdy ma minimum milion wierszy), a następnie przeanalizowanie każdej linii. Obecnie, w taki sposób, w jaki podzieliłem mój potok, najpierw tworzę oddzielny potok, aby po prostu odczytać plik CSV do string[], a następnie planuję później utworzyć potok parsowania.

Ale widząc wyniki mojego potoku odczytu plików, jestem oszołomiony, ponieważ jest znacznie wolniejszy niż zwykłe przechodzenie przez plik CSV, a następnie przechodzenie przez wiersze.

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

A potem w końcu konsumuję to w następujący sposób

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

Natomiast mój kod prostej pętli wygląda następująco:

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

Różnica w wydajności sprowadza się do ~15 sekund dla TPL Dataflow, podczas gdy jest to ~5s dla kodu zapętlonego.

EDYTOWAĆ

Za lepszą radą z komentarzy usunąłem niepotrzebny lineBufferBlock z potoku i to jest teraz mój kod. Jednak wydajność nadal pozostaje taka sama.

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

Odpowiedzi

TheodorZoulias Nov 26 2020 at 00:41

Podczas konfigurowania potoku należy mieć na uwadze możliwości sprzętu, który ma wykonać zadanie. Przepływ danych TPL nie wykonuje zadania sam, deleguje je do procesora, dysku twardego/SSD, karty sieciowej itp. Na przykład podczas odczytu plików z dysku twardego prawdopodobnie nie ma sensu nakazywać TPL odczytywania danych z 8 plików jednocześnie, ponieważ głowica mechanicznego ramienia HDD nie może być fizycznie zlokalizowana w 8 miejscach jednocześnie. Sprowadza się to do tego, że odczytywanie plików z systemów plików nie jest szczególnie przyjazne dla równoległego. W przypadku dysków SSD jest nieco lepiej, ale trzeba będzie to przetestować w każdym przypadku.

Inną kwestią związaną z równoległością jest ziarnistość. Chcesz, aby obciążenie było duże, a nie szczegółowe. W przeciwnym razie koszt przekazywania komunikatów z bufora do bufora i umieszczanie barier pamięci wokół każdego transferu w celu zapewnienia widoczności między wątkami może negować wszelkie korzyści, jakich można oczekiwać od zastosowania równoległości. Wskazówka: dzielenie pojedynczego stringna części to bardzo szczegółowa operacja.

Oto sposób, aby to zrobić:

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

W tym przykładzie użyto Batchoperatora z MoreLinqpakietu, aby przekazać wiersze w partiach po 100, zamiast przekazywać je jeden po drugim. Inne opcje dozowania znajdziesz tutaj .


Aktualizacja: Jeszcze jedną sugestią jest zwiększenie minimalnej liczby wątków, które ThreadPooltworzy na żądanie ( SetMinThreads). W przeciwnym razie konfiguracja ThreadPoolzostanie natychmiast nasycona MaxDegreeOfParallelism = Environment.ProcessorCount, co spowoduje niewielkie, ale zauważalne (500 ms) opóźnienia, z powodu celowego lenistwa ThreadPoolalgorytmu wstrzykiwania wątków.

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

Wystarczy raz wywołać tę metodę na starcie programu.