내 TPL Dataflow Pipeline이 단순한 루핑에 비해 대용량 CSV 파일을 읽는 속도가 느린 이유는 무엇입니까?
따라서 내 요구 사항은 여러 CSV 파일 (각각 최소 백만 개의 행이 있음)을 읽고 각 줄을 구문 분석하는 것입니다. 현재 파이프 라인을 분리 한 방식으로 먼저 CSV 파일을 string []으로 읽어들이는 별도의 파이프 라인을 생성 한 다음 나중에 파싱 파이프 라인을 생성 할 계획입니다.
그러나 파일 읽기 파이프 라인의 결과를 보면 CSV 파일을 반복 한 다음 행을 반복하는 것보다 훨씬 느리기 때문에 어리석은 일입니다.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});
var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
{
using (var fileStream = File.OpenRead(filePath)) {
using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
string line;
while ((line = streamReader.ReadLine()) != null) {
var isCompleted = await lineBufferBlock.SendAsync(line);
while (!isCompleted)
{
isCompleted = await lineBufferBlock.SendAsync(line);
}
}
}
}
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
fileReadingBlock.Completion.ContinueWith((task) =>
{
lineBufferBlock.Complete();
});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
그리고 마침내 다음과 같이 소비합니다
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
fileReadingPipeline.SendAsync(filePath);
}
fileReadingPipeline.Complete();
while (true) {
try {
var outputRows = fileReadingPipeline.Receive();
foreach (string word in outputRows)
{
}
}
catch (InvalidOperationException e) {
break;
}
}
내 직선 루프 코드는 다음과 같습니다.
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
foreach (string row in File.ReadLines(filePath))
{
foreach (string word in row.Split(","))
{
}
}
}
성능 차이는 TPL Dataflow의 경우 ~ 15 초로 내려 가고 루핑 코드의 경우 ~ 5 초입니다.
편집하다
주석의 더 나은 조언에 따라 파이프 라인에서 불필요한 lineBufferBlock을 제거했으며 이제 이것이 제 코드입니다. 그러나 성능은 여전히 동일합니다.
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
답변
파이프 라인을 구성 할 때 작업을 수행 할 하드웨어의 기능을 염두에 두어야합니다. TPL Dataflow는 그 자체로 작업을 수행하지 않고 CPU, HDD / SSD, 네트워크 카드 등에 위임합니다. 예를 들어 하드 디스크에서 파일을 읽을 때 TPL에 데이터를 읽도록 지시하는 것은 쓸모가 없습니다. 8 개의 파일이 동시에 발생하기 때문에 HDD의 기계식 암 헤드는 물리적으로 동시에 8 개 위치에 위치 할 수 없습니다. 이것은 파일 시스템에서 파일을 읽는 것이 특히 병렬 친화적이지 않다는 사실로 귀결됩니다. SSD의 경우 약간 더 좋지만 사례별로 테스트해야합니다.
병렬화의 또 다른 문제는 세분성입니다. 워크로드가 세분화되지 않고 덩어리가되기를 원합니다. 그렇지 않으면 버퍼에서 버퍼로 메시지를 전달하고 스레드 간 가시성을 보장하기 위해 각 전송 주위에 메모리 장벽을 두는 비용이 병렬 처리를 사용하여 기대할 수있는 모든 이점을 무효화 할 수 있습니다. 팁 : 단일 string
을 부분으로 분할하는 것은 매우 세분화 된 작업입니다.
이를 수행하는 방법은 다음과 같습니다.
using static MoreLinq.Extensions.BatchExtension;
var reader = new TransformManyBlock<string, string[]>(filePath =>
{
return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
var parser = new TransformBlock<string[], string[][]>(lines =>
{
return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });
이 예제에서는 패키지 의 Batch연산자를 사용하여 MoreLinq줄을 하나씩 전달하는 대신 100 개의 일괄 처리로 줄을 전달합니다. 여기에서 다른 일괄 처리 옵션을 찾을 수 있습니다 .
업데이트 : 한 가지 더 제안은 ThreadPool
요청시 생성 되는 최소 스레드 수 ( SetMinThreads) 를 늘리는 것 입니다. 그렇지 않으면 의 스레드 삽입 알고리즘의 의도적 인 게으름으로 인해 작지만 눈에 띄는 (500msec) 지연이 발생 ThreadPool
하는 MaxDegreeOfParallelism = Environment.ProcessorCount
구성에 의해 즉시 포화됩니다 ThreadPool
.
ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
Environment.ProcessorCount * 2);
프로그램 시작시이 메서드를 한 번만 호출하면됩니다.