TPL Dataflow verarbeitet jede Datei synchron, aber jede Zeile innerhalb einer Datei asynchron

Nov 26 2020

Mein Anwendungsfall erfordert daher, dass ich eine Liste von Dateien verarbeite, wobei ich für jede Datei in der Liste jede Zeile durchlaufe und einige Berechnungen in dieser Zeile durchführe. Jetzt besteht mein Problem darin, dass ich nicht mehrere Dateizeilen in meinem Pufferblock haben kann. Daher muss ich grundsätzlich sicherstellen, dass eine Datei vollständig verarbeitet wird (über eine Reihe von Datenflussblöcken), bevor ich überhaupt die zweite Datei eingebe.

Jetzt habe ich mir TPL DataFlow One-by-One-Verarbeitung angesehen, bei der die Antwort lautet, entweder die Verwendung des tpl-Datenflusses ganz einzustellen oder mehrere Verarbeitungsblöcke in einem zu kapseln, damit ich ihn steuern kann. Aber wenn ich das tue, würde ich die "Kompositionsfähigkeit" verlieren, die tpl bietet, es scheint auch ein bisschen verschwenderisch, unabhängige Blöcke zusammen zu werfen. Gibt es eine andere Möglichkeit, dies zu tun?

Ich dachte daran, OutputAvailableAsync am Blattknoten zu verwenden, um mich zu benachrichtigen, wenn alles gelöscht wurde, bevor ich in einer anderen Datei poste. Aber ich konnte OutputAvailableAsync überhaupt nicht zum Laufen bringen. Es wartet nur für immer.

BEARBEITEN

In der Pipeline hätte ich einen Aktionsblock mit Status, für den ich ein ConcurrentDictionary verwenden möchte (für jede Zeile in einer Datei habe ich mehrere wichtige Dinge). Jetzt kann ich unmöglich jede Zeile indizieren, da dies bedeuten würde, dass ich den Status für N Anzahl der Dateien, die zusammen verarbeitet werden, beibehalten müsste. Hier wäre N wahrscheinlich die Anzahl der zu verarbeitenden Dateien.

Dies ist, was ich jetzt habe, denken Sie daran, dass ich gerade einen Proof of Concept codiert habe.

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

Antworten

1 TheodorZoulias Nov 28 2020 at 02:13

Sie können die bedingten Verknüpfungsfunktionen des TPL-Datenflusses nutzen, um eine Pipeline zu erstellen, die teilweise gemeinsam genutzt und teilweise dediziert wird. Ein einzelner Leseblock und ein einzelner Parserblock werden von allen Dateien gemeinsam genutzt, während für jede Datei ein dedizierter Prozessorblock erstellt wird. Hier ist eine einfache Demonstration des Konzepts:

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
    return (line.Id, line.Line?.Split(","));
});

var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
    var processor = CreateProcessor(file.Id);

    // Create a conditional link from the parser block to the processor block
    var link = parser.LinkTo(processor, entry => entry.Id == file.Id);

    return File
        .ReadLines(file.Path)
        .Select(line => (file.Id, line))
        .Append((file.Id, null)); // Completion signal
});

ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
    var streamWriter = new StreamWriter($@"C:\{fileId}.out");

    return new ActionBlock<(int Id, string[] LineParts)>(line =>
    {
        if (line.LineParts == null)
        {
            streamWriter.Close(); // Completion signal received
            return;
        }
        streamWriter.WriteLine(String.Join("|", line.LineParts));
    });
}

reader.LinkTo(parser);

In diesem Beispiel ist jede Datei mit einem verknüpft int Id. Dies Idwird zusammen mit jeder Zeile weitergegeben, um die Datei stromabwärts rekonstruieren zu können. Wertetupel werden verwendet, um jedes Datenelement mit dem Idder Ursprungsdatei zu kombinieren . Eine bedingte Verbindung wird zwischen dem gemeinsam genutzten parserBlock und jedem dedizierten processorBlock erstellt. Eine nullNutzlast wird als Indikator für das Dateiende verwendet. Beim Empfang dieses Signals sollte sich ein Prozessor blockidealerweise von der Verbindung trennen parser, um den Overhead des bedingten Verbindungsmechanismus auf ein Minimum zu beschränken. Das Aufheben der Verknüpfung erfolgt durch Entsorgen der linkvon der LinkToMethode zurückgegebenen Daten . Der Einfachheit halber wurde dieser wichtige Schritt im obigen Beispiel weggelassen.

Ich sollte hier wahrscheinlich wiederholen, was ich bereits in meiner Antwort in einer früheren verwandten Frage geschrieben habe , dass das Übergeben einzelner Zeichenfolgen von Block zu Block zu einem erheblichen Overhead führt. Das Chunkifizieren (Batching) der Arbeitslast ist der richtige Weg, um sicherzustellen, dass die Pipeline so reibungslos (reibungslos) wie möglich funktioniert.