TPL Dataflow procesa cada archivo de forma sincrónica pero cada línea dentro de un archivo de forma asincrónica
Entonces, mi caso de uso requiere que procese una lista de archivos, donde para cada archivo en la lista, reviso cada línea y hago algunos cálculos en esas líneas. Ahora mi problema es que no puedo tener líneas de varios archivos en mi bloque de búfer, por lo que básicamente necesito asegurarme de que un archivo esté completamente procesado (a través de una serie de bloques de flujo de datos), incluso antes de ingresar al segundo archivo.
Ahora miré el procesamiento de TPL DataFlow uno por uno, donde la respuesta dice dejar de usar el flujo de datos de tpl por completo o encapsular múltiples bloques de procesamiento en uno para poder controlarlo. Pero si hago eso, perdería la "capacidad de composición" que proporciona tpl, también parece un poco derrochador agrupar bloques independientes. ¿Hay alguna otra forma de hacer esto?
Pensé en usar OutputAvailableAsync en el nodo hoja para notificarme cuando todo se haya eliminado antes de publicar en otro archivo. Pero no pude hacer que OutputAvailableAsync funcionara en absoluto. Simplemente espera para siempre.
EDITAR
Más adelante, tendría un bloque de acción con estado, para el cual planeo usar un ConcurrentDictionary (para cada línea en un archivo, tengo varias cosas a destacar). Ahora no puedo indexar cada línea porque eso significaría que tendría que mantener el estado para N número de archivos que se procesan juntos. Aquí N probablemente sería el número de archivos a procesar.
Esto es lo que tengo por ahora, tenga en cuenta que acabo de codificar una prueba de concepto.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
Respuestas
Puede aprovechar las capacidades de vinculación condicional del flujo de datos de TPL para crear una canalización que sea parcialmente compartida y parcialmente dedicada. Todos los archivos compartirían un solo bloque de lector y un solo bloque de analizador, mientras que se creará un bloque de procesador dedicado para cada archivo. Aquí hay una demostración simple del concepto:
var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
return (line.Id, line.Line?.Split(","));
});
var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
var processor = CreateProcessor(file.Id);
// Create a conditional link from the parser block to the processor block
var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
return File
.ReadLines(file.Path)
.Select(line => (file.Id, line))
.Append((file.Id, null)); // Completion signal
});
ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
var streamWriter = new StreamWriter($@"C:\{fileId}.out");
return new ActionBlock<(int Id, string[] LineParts)>(line =>
{
if (line.LineParts == null)
{
streamWriter.Close(); // Completion signal received
return;
}
streamWriter.WriteLine(String.Join("|", line.LineParts));
});
}
reader.LinkTo(parser);
En este ejemplo, cada archivo está asociado con un int Id
. Esto Id
se pasa junto con cada línea, para poder reconstruir el archivo en sentido descendente. Las tuplas de valor se utilizan para combinar cada dato con el Id
de su archivo originado. Se crea un enlace condicional entre el parser
bloque compartido y cada processor
bloque dedicado . Una null
carga útil se utiliza como indicador de fin de archivo. Al recibir esta señal, un procesador block
idealmente debería desvincularse del parser
, para mantener la sobrecarga del mecanismo de enlace condicional al mínimo. La desvinculación se realiza eliminando lo link
devuelto por el LinkTo
método. En aras de la simplicidad, este importante paso se ha omitido del ejemplo anterior.
Probablemente debería repetir aquí lo que ya escribí en mi respuesta en una pregunta relacionada anterior , que pasar cadenas individuales de un bloque a otro resultará en una sobrecarga significativa. El camino a seguir es dividir (agrupar) la carga de trabajo para garantizar que la tubería funcione de la manera más fluida (sin fricciones) posible.