내 TPL Dataflow Pipeline이 단순한 루핑에 비해 대용량 CSV 파일을 읽는 속도가 느린 이유는 무엇입니까?

Nov 25 2020

따라서 내 요구 사항은 여러 CSV 파일 (각각 최소 백만 개의 행이 있음)을 읽고 각 줄을 구문 분석하는 것입니다. 현재 파이프 라인을 분리 한 방식으로 먼저 CSV 파일을 string []으로 읽어들이는 별도의 파이프 라인을 생성 한 다음 나중에 파싱 파이프 라인을 생성 할 계획입니다.

그러나 파일 읽기 파이프 라인의 결과를 보면 CSV 파일을 반복 한 다음 행을 반복하는 것보다 훨씬 느리기 때문에 어리석은 일입니다.

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

그리고 마침내 다음과 같이 소비합니다

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

내 직선 루프 코드는 다음과 같습니다.

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

성능 차이는 TPL Dataflow의 경우 ~ 15 초로 내려 가고 루핑 코드의 경우 ~ 5 초입니다.

편집하다

주석의 더 나은 조언에 따라 파이프 라인에서 불필요한 lineBufferBlock을 제거했으며 이제 이것이 제 코드입니다. 그러나 성능은 여전히 ​​동일합니다.

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

답변

TheodorZoulias Nov 26 2020 at 00:41

파이프 라인을 구성 할 때 작업을 수행 할 하드웨어의 기능을 염두에 두어야합니다. TPL Dataflow는 그 자체로 작업을 수행하지 않고 CPU, HDD / SSD, 네트워크 카드 등에 위임합니다. 예를 들어 하드 디스크에서 파일을 읽을 때 TPL에 데이터를 읽도록 지시하는 것은 쓸모가 없습니다. 8 개의 파일이 동시에 발생하기 때문에 HDD의 기계식 암 헤드는 물리적으로 동시에 8 개 위치에 위치 할 수 없습니다. 이것은 파일 시스템에서 파일을 읽는 것이 특히 병렬 친화적이지 않다는 사실로 귀결됩니다. SSD의 경우 약간 더 좋지만 사례별로 테스트해야합니다.

병렬화의 또 다른 문제는 세분성입니다. 워크로드가 세분화되지 않고 덩어리가되기를 원합니다. 그렇지 않으면 버퍼에서 버퍼로 메시지를 전달하고 스레드 간 가시성을 보장하기 위해 각 전송 주위에 메모리 장벽을 두는 비용이 병렬 처리를 사용하여 기대할 수있는 모든 이점을 무효화 할 수 있습니다. 팁 : 단일 string을 부분으로 분할하는 것은 매우 세분화 된 작업입니다.

이를 수행하는 방법은 다음과 같습니다.

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

이 예제에서는 패키지 의 Batch연산자를 사용하여 MoreLinq줄을 하나씩 전달하는 대신 100 개의 일괄 처리로 줄을 전달합니다. 여기에서 다른 일괄 처리 옵션을 찾을 수 있습니다 .


업데이트 : 한 가지 더 제안은 ThreadPool요청시 생성 되는 최소 스레드 수 ( SetMinThreads) 를 늘리는 것 입니다. 그렇지 않으면 의 스레드 삽입 알고리즘의 의도적 인 게으름으로 인해 작지만 눈에 띄는 (500msec) 지연이 발생 ThreadPool하는 MaxDegreeOfParallelism = Environment.ProcessorCount구성에 의해 즉시 포화됩니다 ThreadPool.

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

프로그램 시작시이 메서드를 한 번만 호출하면됩니다.