TPL Dataflowは各ファイルを同期的に処理しますが、ファイル内の各行は非同期的に処理します
したがって、私のユースケースでは、ファイルのリストを処理する必要があります。リスト内のすべてのファイルについて、各行を調べ、それらの行でいくつかの計算を行います。私の問題は、バッファブロックに複数のファイルの行を含めることができないことです。したがって、2番目のファイルを入力する前に、基本的に1つのファイルが(一連のデータフローブロックを介して)完全に処理されることを確認する必要があります。
ここで、TPL DataFlowを1つずつ確認しました。その答えは、tpl dataflowの使用を完全に停止するか、複数の処理ブロックを1つにカプセル化して制御できるようにすることです。しかし、そうすると、tplが提供する「構成可能性」が失われ、独立したブロックにまとめるのも少し無駄に思えます。これを行う他の方法はありますか?
別のファイルに投稿する前に、リーフノードでOutputAvailableAsyncを使用して、すべてがフラッシュされたときに通知することを考えました。しかし、OutputAvailableAsyncをまったく機能させることができませんでした。それは永遠に待つだけです。
編集
パイプラインの下流には、ConcurrentDictionaryを使用することを計画している状態のアクションブロックがあります(ファイルの各行について、複数の注意事項があります)。これで、各行にインデックスを付けることができなくなります。これは、一緒に処理されているN個のファイルの状態を維持する必要があるためです。ここで、Nはおそらく処理されるファイルの数になります。
これは私が今持っているものです。概念実証をコード化したばかりであることを覚えておいてください。
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
回答
部分的に共有され、部分的に専用のパイプラインを作成するために、TPLデータフローの条件付きリンク機能を利用できます。単一のリーダーブロックと単一のパーサーブロックがすべてのファイルで共有され、ファイルごとに専用のプロセッサブロックが作成されます。コンセプトの簡単なデモンストレーションは次のとおりです。
var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
return (line.Id, line.Line?.Split(","));
});
var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
var processor = CreateProcessor(file.Id);
// Create a conditional link from the parser block to the processor block
var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
return File
.ReadLines(file.Path)
.Select(line => (file.Id, line))
.Append((file.Id, null)); // Completion signal
});
ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
var streamWriter = new StreamWriter($@"C:\{fileId}.out");
return new ActionBlock<(int Id, string[] LineParts)>(line =>
{
if (line.LineParts == null)
{
streamWriter.Close(); // Completion signal received
return;
}
streamWriter.WriteLine(String.Join("|", line.LineParts));
});
}
reader.LinkTo(parser);
この例では、各ファイルはに関連付けられていint Id
ます。これId
は、ファイルをダウンストリームで再構築できるようにするために、各行とともに渡されます。値タプルは、各データをId
元のファイルのデータと組み合わせるために使用されます。共有parser
ブロックと各専用processor
ブロックの間に条件付きリンクが作成されます。null
ペイロードは、エンドオブファイルインジケータとして使用されます。この信号を受信すると、プロセッサは、条件付きリンクメカニズムのオーバーヘッドを最小限に抑えるために、block
理想的には自身をからリンク解除parser
する必要があります。リンク解除はlink
、LinkTo
メソッドによって返されたものを破棄することによって実行されます。簡単にするために、この重要なステップは上記の例から省略されています。
おそらく、前の関連する質問の回答ですでに書いたことをここで繰り返す必要があります。ブロックからブロックに個々の文字列を渡すと、かなりのオーバーヘッドが発生します。パイプラインが可能な限りスムーズに(摩擦なしで)実行されるようにするために、ワークロードをチャンク化(バッチ処理)することが道のりです。