Warum liest meine TPL-Dataflow-Pipeline große CSV-Dateien langsamer als nur Schleifen?

Nov 25 2020

Meine Anforderung besteht also darin, mehrere CSV-Dateien (jede mit mindestens einer Million Zeilen) zu lesen und dann jede Zeile zu parsen. Derzeit habe ich meine Pipeline so aufgeteilt, dass ich zuerst eine separate Pipeline erstelle, um einfach eine CSV-Datei in einen String[] einzulesen, und dann plane ich, die Parsing-Pipeline später zu erstellen.

Aber wenn ich die Ergebnisse meiner File Reading Pipeline sehe, bin ich verblüfft, weil sie erheblich langsamer ist, als nur die CSV-Datei und dann die Zeilen zu durchlaufen.

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

Und dann konsumiere ich es endlich wie folgt

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

Während mein gerader Schleifencode der folgende ist:

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

Der Leistungsunterschied beträgt für TPL Dataflow ~15 Sekunden, während er für den Schleifencode ~5s beträgt.

BEARBEITEN

Auf besseren Rat aus den Kommentaren habe ich den unnötigen lineBufferBlock aus der Pipeline entfernt und dies ist jetzt mein Code. Die Leistung bleibt jedoch gleich.

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

Antworten

TheodorZoulias Nov 26 2020 at 00:41

Wenn Sie eine Pipeline konfigurieren, sollten Sie die Fähigkeiten der Hardware berücksichtigen, die die Aufgabe erledigen soll. Der TPL Dataflow erledigt die Arbeit nicht von selbst, er delegiert ihn an die CPU, die HDD/SSD, die Netzwerkkarte usw. Wenn Sie beispielsweise Dateien von einer Festplatte lesen, ist es wahrscheinlich sinnlos, die TPL anzuweisen, Daten zu lesen 8 Dateien gleichzeitig, da sich der Kopf des mechanischen Arms der Festplatte nicht an 8 Orten gleichzeitig befinden kann. Dies läuft darauf hinaus, dass das Lesen von Dateien aus Dateisystemen nicht besonders parallel-freundlich ist. Bei SSDs ist es etwas besser, aber Sie müssen es von Fall zu Fall testen.

Ein weiteres Problem bei der Parallelisierung ist die Granularität. Sie möchten, dass die Arbeitslast klumpig und nicht granular ist. Andernfalls können die Kosten für das Weiterleiten von Nachrichten von Puffer zu Puffer und das Aufstellen von Speicherbarrieren um jede Übertragung, um die Cross-Thread-Sichtbarkeit zu gewährleisten, alle Vorteile zunichte machen, die Sie von der Verwendung von Parallelität erwarten. Tipp: Das Aufteilen einer Single stringin Teile ist ein sehr granularer Vorgang.

Hier ist eine Möglichkeit, dies zu tun:

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

In diesem Beispiel wird der BatchOperator aus dem MoreLinqPaket verwendet, um die Zeilen in Stapeln von 100 weiterzugeben, anstatt sie einzeln zu übergeben. Weitere Batching-Optionen finden Sie hier .


Update: Ein weiterer Vorschlag ist, die Mindestanzahl von Threads zu erhöhen, die ThreadPoolbei Bedarf erstellt werden ( SetMinThreads). Andernfalls ThreadPoolwird das sofort von der MaxDegreeOfParallelism = Environment.ProcessorCountKonfiguration gesättigt , was aufgrund der absichtlichen Trägheit des ThreadPoolThread-Injektionsalgorithmus von .

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

Es genügt, diese Methode einmal beim Start des Programms aufzurufen.