TPL Dataflow traite chaque fichier de manière synchrone mais chaque ligne d'un fichier de manière asynchrone
Mon cas d'utilisation m'oblige donc à traiter une liste de fichiers, où pour chaque fichier de la liste, je passe par chaque ligne et fais des calculs sur ces lignes. Maintenant, mon problème est que je ne peux pas avoir plusieurs lignes de fichiers dans mon bloc tampon, donc je dois essentiellement m'assurer qu'un fichier est complètement traité (via une série de blocs de flux de données), avant même d'entrer le deuxième fichier.
Maintenant, j'ai regardé TPL DataFlow un par un traitement où la réponse dit soit d'arrêter complètement d'utiliser tpl dataflow ou d'encapsuler plusieurs blocs de traitement en un seul afin que je puisse le contrôler. Mais si je fais cela, je perdrais la "composabilité" fournie par tpl, il semble également un peu inutile de regrouper des blocs indépendants. Y a-t-il une autre façon de faire cela?
J'ai pensé à utiliser OutputAvailableAsync au niveau du nœud feuille pour me notifier lorsque tout a été vidé avant de publier dans un autre fichier. Mais je ne pouvais pas du tout faire fonctionner OutputAvailableAsync. Cela attend une éternité.
ÉDITER
Dans le pipeline, j'aurais un bloc d'action avec état, pour lequel je prévois d'utiliser un ConcurrentDictionary (pour chaque ligne d'un fichier, j'ai plusieurs choses à noter). Maintenant, je ne peux pas indexer chaque ligne parce que cela signifierait que je devrais garder l'état pour le nombre N de fichiers traités ensemble. Ici, N serait probablement le nombre de fichiers à traiter.
C'est ce que j'ai pour le moment, gardez à l'esprit que je viens de coder une preuve de concept.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
Réponses
Vous pouvez tirer parti des capacités de liaison conditionnelle du TPL Dataflow, afin de créer un pipeline partiellement partagé et partiellement dédié. Un seul bloc de lecteur et un seul bloc d'analyseur seraient partagés par tous les fichiers, tandis qu'un bloc de processeur dédié sera créé pour chaque fichier. Voici une simple démonstration du concept:
var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
return (line.Id, line.Line?.Split(","));
});
var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
var processor = CreateProcessor(file.Id);
// Create a conditional link from the parser block to the processor block
var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
return File
.ReadLines(file.Path)
.Select(line => (file.Id, line))
.Append((file.Id, null)); // Completion signal
});
ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
var streamWriter = new StreamWriter($@"C:\{fileId}.out");
return new ActionBlock<(int Id, string[] LineParts)>(line =>
{
if (line.LineParts == null)
{
streamWriter.Close(); // Completion signal received
return;
}
streamWriter.WriteLine(String.Join("|", line.LineParts));
});
}
reader.LinkTo(parser);
Dans cet exemple, chaque fichier est associé à un fichier int Id
. Celui-ci Id
est transmis avec chaque ligne, afin de pouvoir reconstruire le fichier en aval. Les tuples de valeur sont utilisés pour combiner chaque élément de données avec le Id
de son fichier d'origine. Un lien conditionnel est créé entre le parser
bloc partagé et chaque processor
bloc dédié . Une null
charge utile est utilisée comme indicateur de fin de fichier. Lors de la réception de ce signal, un processeur block
devrait idéalement se dissocier du parser
, afin de réduire au minimum la surcharge du mécanisme de liaison conditionnelle. La dissociation est effectuée en supprimant le link
retourné par la LinkTo
méthode. Par souci de simplicité, cette étape importante a été omise de l'exemple ci-dessus.
Je devrais probablement répéter ici ce que j'ai déjà écrit dans ma réponse à une question connexe précédente , à savoir que le passage de chaînes individuelles d'un bloc à l'autre entraînera une surcharge importante. La segmentation (mise en lots) de la charge de travail est la voie à suivre, afin de garantir que le pipeline fonctionnera aussi bien (sans friction) que possible.