Pourquoi mon pipeline TPL Dataflow est-il plus lent à lire des fichiers CSV volumineux par rapport à une simple boucle ?
Mon exigence est donc de lire plusieurs fichiers CSV (chacun ayant au moins un million de lignes), puis d'analyser chaque ligne. Actuellement, la façon dont j'ai décomposé mon pipeline, je crée d'abord un pipeline séparé pour simplement lire un fichier CSV dans une chaîne [], puis je prévois de créer le pipeline d'analyse plus tard.
Mais en voyant les résultats de mon pipeline de lecture de fichiers, je suis abasourdi car il est considérablement plus lent que de simplement parcourir le fichier CSV, puis de parcourir les lignes.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});
var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
{
using (var fileStream = File.OpenRead(filePath)) {
using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
string line;
while ((line = streamReader.ReadLine()) != null) {
var isCompleted = await lineBufferBlock.SendAsync(line);
while (!isCompleted)
{
isCompleted = await lineBufferBlock.SendAsync(line);
}
}
}
}
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
fileReadingBlock.Completion.ContinueWith((task) =>
{
lineBufferBlock.Complete();
});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
Et puis je le consomme finalement comme suit
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
fileReadingPipeline.SendAsync(filePath);
}
fileReadingPipeline.Complete();
while (true) {
try {
var outputRows = fileReadingPipeline.Receive();
foreach (string word in outputRows)
{
}
}
catch (InvalidOperationException e) {
break;
}
}
Alors que mon code de boucle droite est le suivant :
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
foreach (string row in File.ReadLines(filePath))
{
foreach (string word in row.Split(","))
{
}
}
}
La différence de performances se résume à ~15 secondes pour TPL Dataflow alors qu'elle est de ~5s pour le code en boucle.
ÉDITER
Sur les meilleurs conseils des commentaires, j'ai supprimé le lineBufferBlock inutile du pipeline et voici mon code maintenant. Cependant, les performances restent toujours les mêmes.
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
Réponses
Lorsque vous configurez un pipeline, vous devez avoir à l'esprit les capacités du matériel qui va faire le travail. Le flux de données TPL ne fait pas le travail tout seul, il le délègue au CPU, au disque dur/SSD, à la carte réseau, etc. Par exemple, lors de la lecture de fichiers à partir d'un disque dur, il est probablement inutile de demander au TPL de lire les données depuis 8 fichiers simultanément, car la tête du bras mécanique du disque dur ne peut pas être physiquement localisée à 8 endroits en même temps. Cela se résume au fait que la lecture de fichiers à partir de systèmes de fichiers n'est pas particulièrement adaptée au parallèle. C'est un peu mieux dans le cas des SSD, mais il faudra le tester au cas par cas.
Un autre problème avec la parallélisation est la granularité. Vous voulez que la charge de travail soit volumineuse et non granulaire. Sinon, le coût de transmission des messages d'un tampon à l'autre et la mise en place de barrières mémoire autour de chaque transfert pour assurer la visibilité entre les threads peuvent annuler tous les avantages que vous pouvez attendre de l'utilisation du parallélisme. Astuce : le fractionnement d'un seul string
en plusieurs parties est une opération très granulaire.
Voici une façon de le faire :
using static MoreLinq.Extensions.BatchExtension;
var reader = new TransformManyBlock<string, string[]>(filePath =>
{
return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
var parser = new TransformBlock<string[], string[][]>(lines =>
{
return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });
Cet exemple utilise l' Batchopérateur du MoreLinqpackage pour passer les lignes par lots de 100, au lieu de les passer une par une. Vous pouvez trouver d'autres options de traitement par lots ici .
Mise à jour : une autre suggestion consiste à augmenter le nombre minimum de threads ThreadPool
créés à la demande ( SetMinThreads). Sinon, le ThreadPool
sera immédiatement saturé par la MaxDegreeOfParallelism = Environment.ProcessorCount
configuration, ce qui entraînera des retards faibles mais perceptibles (500 ms), en raison de la paresse intentionnelle de l' ThreadPool
algorithme d'injection de thread de .
ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
Environment.ProcessorCount * 2);
Il suffit d'appeler cette méthode une fois au démarrage du programme.