상대적으로 큰 많은 개체의 C # 가비지 수집
특정 종류의 정보에 대해 서로 다른 데이터 소스를 폴링하는 몇 가지 프로세스가 있습니다. 그들은 그것을 자주 폴링하고 백그라운드에서 수행 하므로이 정보가 필요할 때 쉽게 사용할 수 있으며 시간을 낭비하는 왕복이 필요하지 않습니다.
샘플 코드는 다음과 같습니다.
public class JournalBackgroundPoller
{
private readonly int _clusterSize;
private readonly IConfiguration _configuration;
Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;
internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();
_configuration = configuration;
_clusterSize = 20;//for the sake of demo
_journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
_runHolder = BuildAndRun();
}
private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
PopulateShardsRegistry();
foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
pollingTasks.Add(PollInfinitely(js, buffer));
}
var r = Task.WhenAll(pollingTasks);
return r;
}
private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}
private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data"); buffer.Post( new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() }); } await Task.Delay(_localRandom.Next(400, 601)); } } private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input) { try { if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any()) { return input; } foreach (var journalEntry in input.JournalEntryResponseItems) { //do some transformations here } return input; } catch (Exception ex) { Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}
private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
단순성을 위해 저널 관련 엔티티는 다음과 같습니다.
class JournalEntryResponseItem
{
public string SomeProperty1 { get; set; }
public string SomeProperty2 { get; set; }
}
class JournalResponsesWrapper
{
public KeyValuePair<int, string> JournalDataSource { get; set; }
public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}
제공된 코드의 전체적인 문제는 분명히 짧은 시간에 LOH로 끝날 수있는 비교적 많은 양의 개체를 만들고 있다는 것입니다. 데이터 소스는 항상 최신 항목을 제공하므로 이전 항목을 유지할 필요가 없습니다 (구별되지 않으므로 할 수 없음). 내 질문은 가비지 수집 빈도를 줄일 수 있도록 메모리 사용, 개체 생성 및 교체 왕복을 최적화 할 수 있는지 여부입니다. 지금은 가비지 콜렉션이 ~ 5-10 초마다 발생합니다.
UPD 1 : 데이터를 통해 데이터에 액세스 ResultsBuffer
하고 새로 고치기 전에 동일한 세트를 여러 번 읽을 수 있습니다. 하나의 특정 데이터 세트가 한 번만 읽혀 지거나 전혀 읽히지 않는다는 보장은 없습니다. 내 큰 개체는 List<JournalEntryResponseItem>
처음에는 데이터 소스에서 가져온 다음 ResultsBuffer
.
UPD 2 : 데이터 소스에는이 "샤드"의 모든 엔터티를 한 번에 반환하는 엔드 포인트가 하나만 있습니다. 요청하는 동안 필터링을 적용 할 수 없습니다. 응답 엔터티에는 고유 한 키 / 식별자가 없습니다.
UPD 3 : 일부 답변은 먼저 앱을 측정 / 프로파일 링하도록 제안합니다. 이것은이 특별한 경우에 완벽하게 유효한 제안이지만 다음과 같은 관찰로 인해 분명히 메모리 / GC와 관련된 것입니다.
- 시각적 스로틀 링은 앱의 RAM 사용량이 일정 시간 동안 꾸준히 증가한 후 급격히 감소하는 순간에 정확히 발생합니다.
- 저널 소스를 X 개 더 추가하면 앱의 메모리가 서버에서 사용 가능한 메모리를 모두 차지할 때까지 증가한 다음 메모리가 급격히 줄어들고 앱이 메모리 제한에 도달 할 때까지 계속 작동합니다 (1 ~ 3 초). 다시.
답변
뒤에는 List<T>
항상 T[]
연속적인 항목이 있으므로 200000으로 치수를 지정하면 LOH에 곧바로 배치됩니다. 이를 피하기 위해 물리적 차원 및 Post
일괄 처리 목록 대신 간단한 논리적 파티셔닝을 사용하는 것이 좋습니다 . 이렇게하면 각 투표 중에 거대한 목록이 LOH로 이동하지만 다음 GC 2 세대 컬렉션에서 수집됩니다 (더 이상 참조가 없는지 확인하십시오). LOH는 거의 비워 지지만 Managed Heap에서 발생하는 추가 복사 작업으로 인해 이전보다 더 많은 GC Generation 2 컬렉션이 있습니다. 그것은 작은 변화이며 새로운 JournalBackgroundPoller
수업을 제공합니다 .
public class JournalBackgroundPoller
{
private readonly int _clusterSize;
private readonly IConfiguration _configuration;
Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;
internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();
_configuration = configuration;
_clusterSize = 20;//for the sake of demo
// _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
_journalAddresses = new Dictionary<int, string>
{
{ 1, "SOME ADDR1" },
{ 2, "SOME ADDR 2" }
};
_runHolder = BuildAndRun();
}
private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
PopulateShardsRegistry();
foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
pollingTasks.Add(PollInfinitely(js, buffer));
}
var r = Task.WhenAll(pollingTasks);
return r;
}
private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}
private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);
// NOTE:
// We need to avoid references to the huge list so GC collects it ASAP in the next
// generation 2 collection: after that, nothing else goes to the LOH.
const int PartitionSize = 1000;
for (var index = 0; index < journalEntries.Count; index += PartitionSize)
{
var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
buffer.Post(
new JournalResponsesWrapper
{
JournalDataSource = dataSourceInfo,
JournalEntryResponseItems = journalEntryResponseItems
});
}
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data"); buffer.Post( new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() }); } await Task.Delay(_localRandom.Next(400, 601)); } } private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input) { try { if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any()) { return input; } foreach (var journalEntry in input.JournalEntryResponseItems) { //do some transformations here } return input; } catch (Exception ex) { Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}
private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
30 초 후 원래 메모리 사용량의 스냅 샷을 살펴보세요.

30 초 후 최적화 된 메모리 사용량의 스냅 샷입니다.

차이점에 유의하십시오.
- 희소 배열 :
JournalEntryResponseItem[]
길이가 200,000 인 낭비 된 1,600,000에서 없음. - LOH 사용량 : 3.05MB 에서 없음.
클라이언트에 다운로드되는 데이터의 양을 처리하도록 PollInifinitely를 조정할 수 있다고 확신하지만, 다운로드 된 데이터가 큰 목록을 분할하는 것은 상당히 까다 롭고 좀 더 깊은 작업이 필요합니다.
처음부터 스트리밍이 아닌 클라이언트 / 소비자를 사용하여 200000 개의 레코드를 다운로드하면 항상 일종의 대형 어레이로 끝날 것입니다. 이는 피할 수없는 일입니다. 스트리밍 할 때 JSON (또는 XML 등)을 구문 분석 할 수있는 라이브러리를 찾거나 작성해야합니다. 그런 다음 개별 목록의 크기를 선택할 수 있으므로 200,000 개의 목록 대신 200 개의 목록이 있습니다. 1,000 개 기록 중. 클라이언트 측에서 레코드 수를 제어 할 수 있다면 200,000 개 대신 1000 개의 레코드 만 요청할 수 있습니다.
많은 데이터를 저장하는 캐시를 작성하는지 아니면 다른 쪽 끝에 소비자가있는 스트리밍 체인의 서비스를 작성하는지 모르겠습니다. 소비자를 가정하면 PollInfinitely에서 지연과 함께 Semaphore를 사용해야합니다. Semaphore 수를 유지하면 쉽게 최대 레코드 수에서 다운로드를 중단 할 수 있습니다 (SemaphoreSlim도 대기 가능합니다).
셋째, 메모리 변동 및 가비지 수집에 실제로 문제가있는 경우 스토리지를 해제되지 않는 단일 대용량 할당으로 만들 수 있습니다. 클래스 대신 구조체를 사용하고 문자열 대신 고정 크기 바이트 배열을 사용하십시오. 최대 크기의 링 버퍼를 시뮬레이션 할 수있는 충분한 코드를 작성하면 수신 클래스에서 링 버퍼로 데이터를 블리트해야합니다. 이것은 당신이 가지고있는 참조 할당보다 느릴 것이지만 당신은 결코 당신의 메모리를 방출하는 가비지 콜렉션을 보지 못할 것입니다-당신의 세마포어에서 최대 링 버퍼 용량을 사용하십시오.
데이터를 스트리밍하는 경우 너무 빨리 읽어서 이점을 얻을 수 없으며 소비자가 산발적으로 만 뛰어 들었을 때만 그렇게 할 수 있습니다.
나는 여기서 올바른 길을 가고 있기를 바랍니다.
표