상대적으로 큰 많은 개체의 C # 가비지 수집

Nov 17 2020

특정 종류의 정보에 대해 서로 다른 데이터 소스를 폴링하는 몇 가지 프로세스가 있습니다. 그들은 그것을 자주 폴링하고 백그라운드에서 수행 하므로이 정보가 필요할 때 쉽게 사용할 수 있으며 시간을 낭비하는 왕복이 필요하지 않습니다.
샘플 코드는 다음과 같습니다.

public class JournalBackgroundPoller
{
    private readonly int _clusterSize;

    private readonly IConfiguration _configuration;

    Dictionary<int, string> _journalAddresses;
    private readonly Random _localRandom;
    private readonly Task _runHolder;

    internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

    public JournalBackgroundPoller(IConfiguration configuration)
    {
        _localRandom = new Random();

        _configuration = configuration;
        _clusterSize = 20;//for the sake of demo

        _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};

        _runHolder = BuildAndRun();
    }

    private Task BuildAndRun()
    {
        var pollingTasks = new List<Task>();
        var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

        PopulateShardsRegistry();

        foreach (var js in _journalAddresses)
        {
            var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
                new ExecutionDataflowBlockOptions
                { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

            var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

            buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

            dataProcessor.LinkTo(dataStorer);
            dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

            pollingTasks.Add(PollInfinitely(js, buffer));
        }

        var r = Task.WhenAll(pollingTasks);
        return r;
    }

    private void PopulateShardsRegistry()
    {
        try
        {
            for (int i = 0; i < _clusterSize; i++)
            {
                var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Could `t initialize shards registry");
        }
    }

    private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
    {
        while (true)
        {
            try
            {
                //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
                var journalEntries = new List<JournalEntryResponseItem>(200000);

                buffer.Post(
                    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data"); buffer.Post( new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() }); } await Task.Delay(_localRandom.Next(400, 601)); } } private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input) { try { if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any()) { return input; } foreach (var journalEntry in input.JournalEntryResponseItems) { //do some transformations here } return input; } catch (Exception ex) { Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
            return null;
        }
    }

    private void StoreValuesInBuffer(JournalResponsesWrapper input)
    {
        try
        {
            ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Could not write content to dictionary");
        }
    }
}

단순성을 위해 저널 관련 엔티티는 다음과 같습니다.

class JournalEntryResponseItem
{
    public string SomeProperty1 { get; set; }

    public string SomeProperty2 { get; set; }
}

class JournalResponsesWrapper
{
    public KeyValuePair<int, string> JournalDataSource { get; set; }

    public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}

제공된 코드의 전체적인 문제는 분명히 짧은 시간에 LOH로 끝날 수있는 비교적 많은 양의 개체를 만들고 있다는 것입니다. 데이터 소스는 항상 최신 항목을 제공하므로 이전 항목을 유지할 필요가 없습니다 (구별되지 않으므로 할 수 없음). 내 질문은 가비지 수집 빈도를 줄일 수 있도록 메모리 사용, 개체 생성 및 교체 왕복을 최적화 할 수 있는지 여부입니다. 지금은 가비지 콜렉션이 ~ 5-10 초마다 발생합니다.

UPD 1 : 데이터를 통해 데이터에 액세스 ResultsBuffer하고 새로 고치기 전에 동일한 세트를 여러 번 읽을 수 있습니다. 하나의 특정 데이터 세트가 한 번만 읽혀 지거나 전혀 읽히지 않는다는 보장은 없습니다. 내 큰 개체는 List<JournalEntryResponseItem>처음에는 데이터 소스에서 가져온 다음 ResultsBuffer .

UPD 2 : 데이터 소스에는이 "샤드"의 모든 엔터티를 한 번에 반환하는 엔드 포인트가 하나만 있습니다. 요청하는 동안 필터링을 적용 할 수 없습니다. 응답 엔터티에는 고유 한 키 / 식별자가 없습니다.

UPD 3 : 일부 답변은 먼저 앱을 측정 / 프로파일 링하도록 제안합니다. 이것은이 특별한 경우에 완벽하게 유효한 제안이지만 다음과 같은 관찰로 인해 분명히 메모리 / GC와 관련된 것입니다.

  1. 시각적 스로틀 링은 앱의 RAM 사용량이 일정 시간 동안 꾸준히 증가한 후 급격히 감소하는 순간에 정확히 발생합니다.
  2. 저널 소스를 X 개 더 추가하면 앱의 메모리가 서버에서 사용 가능한 메모리를 모두 차지할 때까지 증가한 다음 메모리가 급격히 줄어들고 앱이 메모리 제한에 도달 할 때까지 계속 작동합니다 (1 ~ 3 초). 다시.

답변

2 LuckyBrain Nov 24 2020 at 02:42

뒤에는 List<T>항상 T[]연속적인 항목이 있으므로 200000으로 치수를 지정하면 LOH에 곧바로 배치됩니다. 이를 피하기 위해 물리적 차원 및 Post일괄 처리 목록 대신 간단한 논리적 파티셔닝을 사용하는 것이 좋습니다 . 이렇게하면 각 투표 중에 거대한 목록이 LOH로 이동하지만 다음 GC 2 세대 컬렉션에서 수집됩니다 (더 이상 참조가 없는지 확인하십시오). LOH는 거의 비워 지지만 Managed Heap에서 발생하는 추가 복사 작업으로 인해 이전보다 더 많은 GC Generation 2 컬렉션이 있습니다. 그것은 작은 변화이며 새로운 JournalBackgroundPoller수업을 제공합니다 .

public class JournalBackgroundPoller
{
    private readonly int _clusterSize;

    private readonly IConfiguration _configuration;

    Dictionary<int, string> _journalAddresses;
    private readonly Random _localRandom;
    private readonly Task _runHolder;

    internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

    public JournalBackgroundPoller(IConfiguration configuration)
    {
        _localRandom = new Random();

        _configuration = configuration;
        _clusterSize = 20;//for the sake of demo

        // _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
        _journalAddresses = new Dictionary<int, string>
        {
            { 1, "SOME ADDR1" },
            { 2, "SOME ADDR 2" }
        };

        _runHolder = BuildAndRun();
    }

    private Task BuildAndRun()
    {
        var pollingTasks = new List<Task>();
        var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

        PopulateShardsRegistry();

        foreach (var js in _journalAddresses)
        {
            var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
                new ExecutionDataflowBlockOptions
                { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

            var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

            buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

            dataProcessor.LinkTo(dataStorer);
            dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

            pollingTasks.Add(PollInfinitely(js, buffer));
        }

        var r = Task.WhenAll(pollingTasks);
        return r;
    }

    private void PopulateShardsRegistry()
    {
        try
        {
            for (int i = 0; i < _clusterSize; i++)
            {
                var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Could `t initialize shards registry");
        }
    }

    private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
    {
        while (true)
        {
            try
            {
                //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
                var journalEntries = new List<JournalEntryResponseItem>(200000);

                // NOTE:
                // We need to avoid references to the huge list so GC collects it ASAP in the next
                // generation 2 collection: after that, nothing else goes to the LOH.
                const int PartitionSize = 1000;
                for (var index = 0; index < journalEntries.Count; index += PartitionSize)
                {
                    var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
                    buffer.Post(
                        new JournalResponsesWrapper
                        {
                            JournalDataSource = dataSourceInfo,
                            JournalEntryResponseItems = journalEntryResponseItems
                        });
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data"); buffer.Post( new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() }); } await Task.Delay(_localRandom.Next(400, 601)); } } private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input) { try { if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any()) { return input; } foreach (var journalEntry in input.JournalEntryResponseItems) { //do some transformations here } return input; } catch (Exception ex) { Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
            return null;
        }
    }

    private void StoreValuesInBuffer(JournalResponsesWrapper input)
    {
        try
        {
            ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Could not write content to dictionary");
        }
    }
}

30 초 후 원래 메모리 사용량의 스냅 샷을 살펴보세요.

30 초 후 최적화 된 메모리 사용량의 스냅 샷입니다.

차이점에 유의하십시오.

  • 희소 배열 : JournalEntryResponseItem[]길이가 200,000 인 낭비 된 1,600,000에서 없음.
  • LOH 사용량 : 3.05MB 에서 없음.
1 MarkRabjohn Nov 25 2020 at 22:29

클라이언트에 다운로드되는 데이터의 양을 처리하도록 PollInifinitely를 조정할 수 있다고 확신하지만, 다운로드 된 데이터가 큰 목록을 분할하는 것은 상당히 까다 롭고 좀 더 깊은 작업이 필요합니다.

처음부터 스트리밍이 아닌 클라이언트 / 소비자를 사용하여 200000 개의 레코드를 다운로드하면 항상 일종의 대형 어레이로 끝날 것입니다. 이는 피할 수없는 일입니다. 스트리밍 할 때 JSON (또는 XML 등)을 구문 분석 할 수있는 라이브러리를 찾거나 작성해야합니다. 그런 다음 개별 목록의 크기를 선택할 수 있으므로 200,000 개의 목록 대신 200 개의 목록이 있습니다. 1,000 개 기록 중. 클라이언트 측에서 레코드 수를 제어 할 수 있다면 200,000 개 대신 1000 개의 레코드 만 요청할 수 있습니다.

많은 데이터를 저장하는 캐시를 작성하는지 아니면 다른 쪽 끝에 소비자가있는 스트리밍 체인의 서비스를 작성하는지 모르겠습니다. 소비자를 가정하면 PollInfinitely에서 지연과 함께 Semaphore를 사용해야합니다. Semaphore 수를 유지하면 쉽게 최대 레코드 수에서 다운로드를 중단 할 수 있습니다 (SemaphoreSlim도 대기 가능합니다).

셋째, 메모리 변동 및 가비지 수집에 실제로 문제가있는 경우 스토리지를 해제되지 않는 단일 대용량 할당으로 만들 수 있습니다. 클래스 대신 구조체를 사용하고 문자열 대신 고정 크기 바이트 배열을 사용하십시오. 최대 크기의 링 버퍼를 시뮬레이션 할 수있는 충분한 코드를 작성하면 수신 클래스에서 링 버퍼로 데이터를 블리트해야합니다. 이것은 당신이 가지고있는 참조 할당보다 느릴 것이지만 당신은 결코 당신의 메모리를 방출하는 가비지 콜렉션을 보지 못할 것입니다-당신의 세마포어에서 최대 링 버퍼 용량을 사용하십시오.

데이터를 스트리밍하는 경우 너무 빨리 읽어서 이점을 얻을 수 없으며 소비자가 산발적으로 만 뛰어 들었을 때만 그렇게 할 수 있습니다.

나는 여기서 올바른 길을 가고 있기를 바랍니다.