TPLデータフローパイプラインが、ループするだけの場合と比較して、巨大なCSVファイルの読み取りが遅いのはなぜですか?

Nov 25 2020

したがって、私の要件は、複数のCSVファイル(それぞれが最低100万行)を読み取ってから、各行を解析することです。現在、パイプラインを分割した方法では、最初に別のパイプラインを作成して、CSVファイルをstring []に読み込むだけで、後で解析パイプラインを作成する予定です。

しかし、ファイル読み取りパイプラインの結果を見ると、CSVファイルをループしてから行をループするよりもかなり遅いので、私は唖然とします。

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

そしてついに次のように消費します

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

私のストレートループコードは次のとおりですが:

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

パフォーマンスの違いは、TPLデータフローでは最大15秒ですが、ループコードでは最大5秒です。

編集

コメントからのより良いアドバイスで、私はパイプラインから不要なlineBufferBlockを削除しました、そしてこれは今私のコードです。ただし、パフォーマンスは同じままです。

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

回答

TheodorZoulias Nov 26 2020 at 00:41

パイプラインを構成するときは、ジョブを実行するハードウェアの機能を念頭に置く必要があります。TPLデータフローはそれ自体ではジョブを実行せず、CPU、HDD / SSD、ネットワークカードなどに委任します。たとえば、ハードディスクからファイルを読み取る場合、TPLにデータの読み取りを指示するのはおそらく無駄です。 HDDのメカニカルアームのヘッドを物理的に同時に8箇所に配置することはできないため、同時に8ファイル。これは、ファイルシステムからのファイルの読み取りが特に並列処理に適しているわけではないという事実に要約されます。SSDの場合は少し優れていますが、ケースバイケースでテストする必要があります。

並列化に関するもう1つの問題は、粒度です。ワークロードを細かくするのではなく、分厚いも​​のにする必要があります。そうしないと、メッセージをバッファからバッファに渡し、各転送の周囲にメモリバリアを配置してクロススレッドの可視性を確保するコストが、並列処理を採用することで期待できるメリットを打ち消す可能性があります。ヒント:単一stringをパーツに分割することは、非常にきめ細かい操作です。

これを行う方法は次のとおりです。

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

この例では、パッケージのBatch演算子を使用してMoreLinq、行を1つずつ渡すのではなく、100のバッチで行を渡します。ここで他のバッチオプションを見つけることができます。


更新:もう1つの提案はThreadPool、オンデマンドで作成するスレッドの最小数を増やすことです(SetMinThreads)。そうしないThreadPoolと、はMaxDegreeOfParallelism = Environment.ProcessorCount構成によってすぐに飽和状態になり、ThreadPoolスレッドインジェクションアルゴリズムの意図的な遅延のために、小さいながらも目立つ(500ミリ秒)遅延が発生します。

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

プログラムの開始時にこのメソッドを1回呼び出すだけで十分です。