TPL Dataflow는 각 파일을 동 기적으로 처리하지만 파일 내의 각 줄은 비동기 적으로 처리합니다.

Nov 26 2020

따라서 제 사용 사례에서는 파일 목록을 처리해야합니다. 목록의 모든 파일에 대해 각 줄을 살펴보고 해당 줄에서 몇 가지 계산을 수행합니다. 이제 내 문제는 버퍼 블록에 여러 파일 줄을 가질 수 없기 때문에 기본적으로 두 번째 파일을 입력하기 전에 하나의 파일이 (일련의 데이터 흐름 블록을 통해) 완전히 처리되었는지 확인해야합니다.

이제는 TPL 데이터 흐름 사용을 완전히 중지하거나 여러 처리 블록을 하나로 캡슐화하여 제어 할 수 있는 TPL 데이터 흐름 을 하나씩 살펴 보았습니다 . 그러나 그렇게한다면 tpl이 제공하는 "구성 성"을 잃게 될 것입니다. 또한 독립적 인 블록을 함께 묶는 것도 약간 낭비적인 것 같습니다. 이 작업을 수행하는 다른 방법이 있습니까?

리프 노드에서 OutputAvailableAsync를 사용하여 다른 파일에 게시하기 전에 모든 것이 플러시되면 알려주는 것을 생각했습니다. 하지만 OutputAvailableAsync가 전혀 작동하지 않았습니다. 영원히 기다립니다.

편집하다

파이프 라인 아래에는 ConcurrentDictionary를 사용할 계획 인 상태가있는 actionblock이 있습니다 (파일의 각 줄에 대해 여러 가지 참고 사항이 있습니다). 이제는 N 개의 파일이 함께 처리되는 상태를 유지해야하기 때문에 각 줄을 인덱싱 할 수 없습니다. 여기서 N은 처리 할 파일 수입니다.

이것이 제가 지금 가지고있는 것입니다. 방금 개념 증명을 코딩했습니다.

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

답변

1 TheodorZoulias Nov 28 2020 at 02:13

부분적으로 공유되고 부분적으로 전용되는 파이프 라인을 생성하기 위해 TPL Dataflow 의 조건부 연결 기능 을 활용할 수 있습니다 . 단일 리더 블록과 단일 파서 블록은 모든 파일에서 공유되는 반면 전용 프로세서 블록은 각 파일에 대해 생성됩니다. 다음은 개념에 대한 간단한 데모입니다.

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
    return (line.Id, line.Line?.Split(","));
});

var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
    var processor = CreateProcessor(file.Id);

    // Create a conditional link from the parser block to the processor block
    var link = parser.LinkTo(processor, entry => entry.Id == file.Id);

    return File
        .ReadLines(file.Path)
        .Select(line => (file.Id, line))
        .Append((file.Id, null)); // Completion signal
});

ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
    var streamWriter = new StreamWriter($@"C:\{fileId}.out");

    return new ActionBlock<(int Id, string[] LineParts)>(line =>
    {
        if (line.LineParts == null)
        {
            streamWriter.Close(); // Completion signal received
            return;
        }
        streamWriter.WriteLine(String.Join("|", line.LineParts));
    });
}

reader.LinkTo(parser);

이 예에서 각 파일은 int Id. 이것은 Id하류 파일을 재구성 할 수 있도록하기 위해, 각 라인과 함께 전달된다. 값 튜플 은 각 데이터 조각을 Id원래 파일 과 결합하는 데 사용 됩니다. 공유 parser블록과 각 전용 processor블록 사이에 조건부 링크가 생성 됩니다. null페이로드 파일 끝 지표로 사용된다. 이 신호를 수신하면 프로세서 blockparser조건부 연결 메커니즘의 오버 헤드를 최소로 유지하기 위해 이상적으로에서 자체 연결을 해제해야합니다 . 연결 해제는 메서드 에서 link반환 된을 삭제하여 수행됩니다 LinkTo. 단순화를 위해이 중요한 단계는 위의 예에서 생략되었습니다.

이전 관련 질문 에서 이미 작성한 답변을 반복해야합니다. 개별 문자열을 블록에서 블록으로 전달하면 상당한 오버 헤드가 발생합니다. 파이프 라인이 가능한 한 원활하게 (마찰이없는) 작동하도록하려면 워크로드를 청크 화 (일괄 처리)하는 것이 좋습니다.