TPL Dataflow przetwarza każdy plik synchronicznie, ale każdy wiersz w pliku jest asynchroniczny
Tak więc mój przypadek użycia wymaga przetworzenia listy plików, gdzie dla każdego pliku na liście przechodzę przez każdą linię i wykonuję obliczenia w tej linii. Teraz moim problemem jest to, że nie mogę mieć wielu wierszy plików w moim bloku bufora, więc zasadniczo muszę upewnić się, że jeden plik jest całkowicie przetworzony (przez serię bloków przepływu danych), zanim jeszcze wejdę do drugiego pliku.
Teraz przyjrzałem się przetwarzaniu TPL DataFlow Jeden po drugim, w którym odpowiedź mówi, że należy całkowicie zaprzestać używania przepływu danych tpl lub zamknąć wiele bloków przetwarzania w jeden, aby móc go kontrolować. Ale jeśli to zrobię, straciłbym „kompozowalność”, którą zapewnia tpl, ale też zbrylanie w niezależnych blokach wydaje się trochę marnotrawne. Czy jest jakiś inny sposób, aby to zrobić?
Pomyślałem o użyciu OutputAvailableAsync w węźle liścia, aby powiadomić mnie, gdy wszystko zostało opróżnione, zanim opublikuję inny plik. Ale nie mogłem w ogóle zmusić OutputAvailableAsync do działania. Po prostu czeka wiecznie.
EDYTOWAĆ
W dalszej części potoku miałbym blok akcji ze stanem, dla którego planuję użyć ConcurrentDictionary (dla każdego wiersza w pliku mam wiele rzeczy wartych uwagi). Teraz nie mogę prawdopodobnie zindeksować każdego wiersza, ponieważ oznaczałoby to, że musiałbym zachować stan dla N liczby przetwarzanych razem plików. W tym przypadku N prawdopodobnie oznaczałoby liczbę plików do przetworzenia.
Na razie to mam, pamiętaj, że właśnie zakodowałem dowód słuszności koncepcji.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
Odpowiedzi
Możesz skorzystać z możliwości warunkowego łączenia w TPL Dataflow, aby utworzyć potok, który jest częściowo współużytkowany i częściowo dedykowany. Pojedynczy blok czytnika i jeden blok parsera byłyby współużytkowane przez wszystkie pliki, podczas gdy dla każdego pliku zostanie utworzony dedykowany blok procesora. Oto prosta demonstracja koncepcji:
var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
return (line.Id, line.Line?.Split(","));
});
var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
var processor = CreateProcessor(file.Id);
// Create a conditional link from the parser block to the processor block
var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
return File
.ReadLines(file.Path)
.Select(line => (file.Id, line))
.Append((file.Id, null)); // Completion signal
});
ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
var streamWriter = new StreamWriter($@"C:\{fileId}.out");
return new ActionBlock<(int Id, string[] LineParts)>(line =>
{
if (line.LineParts == null)
{
streamWriter.Close(); // Completion signal received
return;
}
streamWriter.WriteLine(String.Join("|", line.LineParts));
});
}
reader.LinkTo(parser);
W tym przykładzie każdy plik jest powiązany z plikiem int Id
. Jest Id
to przekazywane wraz z każdą linią, aby móc zrekonstruować plik w dół. Krotki wartości służą do łączenia każdego fragmentu danych z Id
plikiem, z którego pochodzi. Między wspólnym parser
blokiem a każdym dedykowanym processor
blokiem tworzone jest łącze warunkowe . null
Ładowność jest używany jako wskaźnik EOF. Po odebraniu tego sygnału, block
najlepiej parser
byłoby, gdyby procesor odłączył się od niego , aby ograniczyć do minimum obciążenie mechanizmu warunkowego łączenia. Odłączenie odbywa się poprzez usunięcie link
zwróconego przez LinkTo
metodę. Dla uproszczenia ten ważny krok został pominięty w powyższym przykładzie.
Powinienem chyba powtórzyć to, co już napisałem w mojej odpowiedzi w poprzednim pokrewnym pytaniu , że przekazywanie poszczególnych ciągów z bloku do bloku będzie skutkowało znacznym narzutem. Aby zapewnić możliwie płynną (wolną od tarcia) pracę rurociągu, należy zastosować chunkifing (wsadowy) obciążenie pracą.