TPL Dataflow procesa cada archivo de forma sincrónica pero cada línea dentro de un archivo de forma asincrónica

Nov 26 2020

Entonces, mi caso de uso requiere que procese una lista de archivos, donde para cada archivo en la lista, reviso cada línea y hago algunos cálculos en esas líneas. Ahora mi problema es que no puedo tener líneas de varios archivos en mi bloque de búfer, por lo que básicamente necesito asegurarme de que un archivo esté completamente procesado (a través de una serie de bloques de flujo de datos), incluso antes de ingresar al segundo archivo.

Ahora miré el procesamiento de TPL DataFlow uno por uno, donde la respuesta dice dejar de usar el flujo de datos de tpl por completo o encapsular múltiples bloques de procesamiento en uno para poder controlarlo. Pero si hago eso, perdería la "capacidad de composición" que proporciona tpl, también parece un poco derrochador agrupar bloques independientes. ¿Hay alguna otra forma de hacer esto?

Pensé en usar OutputAvailableAsync en el nodo hoja para notificarme cuando todo se haya eliminado antes de publicar en otro archivo. Pero no pude hacer que OutputAvailableAsync funcionara en absoluto. Simplemente espera para siempre.

EDITAR

Más adelante, tendría un bloque de acción con estado, para el cual planeo usar un ConcurrentDictionary (para cada línea en un archivo, tengo varias cosas a destacar). Ahora no puedo indexar cada línea porque eso significaría que tendría que mantener el estado para N número de archivos que se procesan juntos. Aquí N probablemente sería el número de archivos a procesar.

Esto es lo que tengo por ahora, tenga en cuenta que acabo de codificar una prueba de concepto.

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

Respuestas

1 TheodorZoulias Nov 28 2020 at 02:13

Puede aprovechar las capacidades de vinculación condicional del flujo de datos de TPL para crear una canalización que sea parcialmente compartida y parcialmente dedicada. Todos los archivos compartirían un solo bloque de lector y un solo bloque de analizador, mientras que se creará un bloque de procesador dedicado para cada archivo. Aquí hay una demostración simple del concepto:

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
    return (line.Id, line.Line?.Split(","));
});

var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
    var processor = CreateProcessor(file.Id);

    // Create a conditional link from the parser block to the processor block
    var link = parser.LinkTo(processor, entry => entry.Id == file.Id);

    return File
        .ReadLines(file.Path)
        .Select(line => (file.Id, line))
        .Append((file.Id, null)); // Completion signal
});

ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
    var streamWriter = new StreamWriter($@"C:\{fileId}.out");

    return new ActionBlock<(int Id, string[] LineParts)>(line =>
    {
        if (line.LineParts == null)
        {
            streamWriter.Close(); // Completion signal received
            return;
        }
        streamWriter.WriteLine(String.Join("|", line.LineParts));
    });
}

reader.LinkTo(parser);

En este ejemplo, cada archivo está asociado con un int Id. Esto Idse pasa junto con cada línea, para poder reconstruir el archivo en sentido descendente. Las tuplas de valor se utilizan para combinar cada dato con el Idde su archivo originado. Se crea un enlace condicional entre el parserbloque compartido y cada processorbloque dedicado . Una nullcarga útil se utiliza como indicador de fin de archivo. Al recibir esta señal, un procesador blockidealmente debería desvincularse del parser, para mantener la sobrecarga del mecanismo de enlace condicional al mínimo. La desvinculación se realiza eliminando lo linkdevuelto por el LinkTométodo. En aras de la simplicidad, este importante paso se ha omitido del ejemplo anterior.

Probablemente debería repetir aquí lo que ya escribí en mi respuesta en una pregunta relacionada anterior , que pasar cadenas individuales de un bloque a otro resultará en una sobrecarga significativa. El camino a seguir es dividir (agrupar) la carga de trabajo para garantizar que la tubería funcione de la manera más fluida (sin fricciones) posible.