Dlaczego mój potok przepływu danych TPL wolniej odczytuje duże pliki CSV w porównaniu do zwykłego zapętlania?
Więc moim wymaganiem jest odczytanie wielu plików CSV (każdy ma minimum milion wierszy), a następnie przeanalizowanie każdej linii. Obecnie, w taki sposób, w jaki podzieliłem mój potok, najpierw tworzę oddzielny potok, aby po prostu odczytać plik CSV do string[], a następnie planuję później utworzyć potok parsowania.
Ale widząc wyniki mojego potoku odczytu plików, jestem oszołomiony, ponieważ jest znacznie wolniejszy niż zwykłe przechodzenie przez plik CSV, a następnie przechodzenie przez wiersze.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});
var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
{
using (var fileStream = File.OpenRead(filePath)) {
using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
string line;
while ((line = streamReader.ReadLine()) != null) {
var isCompleted = await lineBufferBlock.SendAsync(line);
while (!isCompleted)
{
isCompleted = await lineBufferBlock.SendAsync(line);
}
}
}
}
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
fileReadingBlock.Completion.ContinueWith((task) =>
{
lineBufferBlock.Complete();
});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
A potem w końcu konsumuję to w następujący sposób
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
fileReadingPipeline.SendAsync(filePath);
}
fileReadingPipeline.Complete();
while (true) {
try {
var outputRows = fileReadingPipeline.Receive();
foreach (string word in outputRows)
{
}
}
catch (InvalidOperationException e) {
break;
}
}
Natomiast mój kod prostej pętli wygląda następująco:
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
foreach (string row in File.ReadLines(filePath))
{
foreach (string word in row.Split(","))
{
}
}
}
Różnica w wydajności sprowadza się do ~15 sekund dla TPL Dataflow, podczas gdy jest to ~5s dla kodu zapętlonego.
EDYTOWAĆ
Za lepszą radą z komentarzy usunąłem niepotrzebny lineBufferBlock z potoku i to jest teraz mój kod. Jednak wydajność nadal pozostaje taka sama.
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
Odpowiedzi
Podczas konfigurowania potoku należy mieć na uwadze możliwości sprzętu, który ma wykonać zadanie. Przepływ danych TPL nie wykonuje zadania sam, deleguje je do procesora, dysku twardego/SSD, karty sieciowej itp. Na przykład podczas odczytu plików z dysku twardego prawdopodobnie nie ma sensu nakazywać TPL odczytywania danych z 8 plików jednocześnie, ponieważ głowica mechanicznego ramienia HDD nie może być fizycznie zlokalizowana w 8 miejscach jednocześnie. Sprowadza się to do tego, że odczytywanie plików z systemów plików nie jest szczególnie przyjazne dla równoległego. W przypadku dysków SSD jest nieco lepiej, ale trzeba będzie to przetestować w każdym przypadku.
Inną kwestią związaną z równoległością jest ziarnistość. Chcesz, aby obciążenie było duże, a nie szczegółowe. W przeciwnym razie koszt przekazywania komunikatów z bufora do bufora i umieszczanie barier pamięci wokół każdego transferu w celu zapewnienia widoczności między wątkami może negować wszelkie korzyści, jakich można oczekiwać od zastosowania równoległości. Wskazówka: dzielenie pojedynczego string
na części to bardzo szczegółowa operacja.
Oto sposób, aby to zrobić:
using static MoreLinq.Extensions.BatchExtension;
var reader = new TransformManyBlock<string, string[]>(filePath =>
{
return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
var parser = new TransformBlock<string[], string[][]>(lines =>
{
return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });
W tym przykładzie użyto Batchoperatora z MoreLinqpakietu, aby przekazać wiersze w partiach po 100, zamiast przekazywać je jeden po drugim. Inne opcje dozowania znajdziesz tutaj .
Aktualizacja: Jeszcze jedną sugestią jest zwiększenie minimalnej liczby wątków, które ThreadPool
tworzy na żądanie ( SetMinThreads). W przeciwnym razie konfiguracja ThreadPool
zostanie natychmiast nasycona MaxDegreeOfParallelism = Environment.ProcessorCount
, co spowoduje niewielkie, ale zauważalne (500 ms) opóźnienia, z powodu celowego lenistwa ThreadPool
algorytmu wstrzykiwania wątków.
ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
Environment.ProcessorCount * 2);
Wystarczy raz wywołać tę metodę na starcie programu.