Windows vs Linux - Utilizzo della memoria del pool di thread C ++

Aug 17 2020

Ho esaminato l'utilizzo della memoria di alcuni framework API REST C ++ in Windows e Linux (Debian). In particolare, ho esaminato questi due framework: cpprestsdk e cpp-httplib . In entrambi, un pool di thread viene creato e utilizzato per soddisfare le richieste.

Ho preso l'implementazione del pool di thread da cpp-httplib e l' ho inserita in un esempio di lavoro minimo di seguito, per mostrare l'utilizzo della memoria che sto osservando su Windows e Linux.

#include <cassert>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

using namespace std;

// TaskQueue and ThreadPool taken from https://github.com/yhirose/cpp-httplib
class TaskQueue {
public:
    TaskQueue() = default;
    virtual ~TaskQueue() = default;

    virtual void enqueue(std::function<void()> fn) = 0;
    virtual void shutdown() = 0;

    virtual void on_idle() {};
};

class ThreadPool : public TaskQueue {
public:
    explicit ThreadPool(size_t n) : shutdown_(false) {
        while (n) {
            threads_.emplace_back(worker(*this));
            cout << "Thread number " << threads_.size() + 1 << " has ID " << threads_.back().get_id() << endl;
            n--;
        }
    }

    ThreadPool(const ThreadPool&) = delete;
    ~ThreadPool() override = default;

    void enqueue(std::function<void()> fn) override {
        std::unique_lock<std::mutex> lock(mutex_);
        jobs_.push_back(fn);
        cond_.notify_one();
    }

    void shutdown() override {
        // Stop all worker threads...
        {
            std::unique_lock<std::mutex> lock(mutex_);
            shutdown_ = true;
        }

        cond_.notify_all();

        // Join...
        for (auto& t : threads_) {
            t.join();
        }
    }

private:
    struct worker {
        explicit worker(ThreadPool& pool) : pool_(pool) {}

        void operator()() {
            for (;;) {
                std::function<void()> fn;
                {
                    std::unique_lock<std::mutex> lock(pool_.mutex_);

                    pool_.cond_.wait(
                        lock, [&] { return !pool_.jobs_.empty() || pool_.shutdown_; });

                    if (pool_.shutdown_ && pool_.jobs_.empty()) { break; }

                    fn = pool_.jobs_.front();
                    pool_.jobs_.pop_front();
                }

                assert(true == static_cast<bool>(fn));
                fn();
            }
        }

        ThreadPool& pool_;
    };
    friend struct worker;

    std::vector<std::thread> threads_;
    std::list<std::function<void()>> jobs_;

    bool shutdown_;

    std::condition_variable cond_;
    std::mutex mutex_;
};

// MWE
class ContainerWrapper {
public:
    ~ContainerWrapper() {
        cout << "Destructor: data map is of size " << data.size() << endl;
    }

    map<pair<string, string>, double> data;
};

void handle_post() {
    
    cout << "Start adding data, thread ID: " << std::this_thread::get_id() << endl;

    ContainerWrapper cw;
    for (size_t i = 0; i < 5000; ++i) {
        string date = "2020-08-11";
        string id = "xxxxx_" + std::to_string(i);
        double value = 1.5;
        cw.data[make_pair(date, id)] = value;
    }

    cout << "Data map is now of size " << cw.data.size() << endl;

    unsigned pause = 3;
    cout << "Sleep for " << pause << " seconds." << endl;
    std::this_thread::sleep_for(std::chrono::seconds(pause));
}

int main(int argc, char* argv[]) {

    cout << "ID of main thread: " << std::this_thread::get_id() << endl;

    std::unique_ptr<TaskQueue> task_queue(new ThreadPool(40));

    for (size_t i = 0; i < 50; ++i) {
        
        cout << "Add task number: " << i + 1 << endl;
        task_queue->enqueue([]() { handle_post(); });

        // Sleep enough time for the task to finish.
        std::this_thread::sleep_for(std::chrono::seconds(5));
    }

    task_queue->shutdown();

    return 0;
}

Quando eseguo questo MWE e guardo il consumo di memoria in Windows rispetto a Linux, ottengo il grafico qui sotto. Per Windows, ho usato perfmonper ottenere il valore Byte privati . In Linux, ho usato docker stats --no-stream --format "{{.MemUsage}}per registrare l'utilizzo della memoria del contenitore. Ciò era in linea con resil processo in topesecuzione all'interno del contenitore. Dal grafico risulta che quando un thread alloca memoria per la mapvariabile in Windows nella handle_postfunzione, la memoria viene restituitaquando la funzione termina prima della successiva chiamata alla funzione. Questo era il tipo di comportamento che mi aspettavo ingenuamente. Non ho esperienza su come il sistema operativo gestisce la memoria allocata da una funzione che viene eseguita in un thread quando il thread rimane attivo, ad esempio come qui in un pool di thread. Su Linux, sembra che l'utilizzo della memoria continui a crescere e che la memoria non venga restituita quando la funzione esce. Quando tutti i 40 thread sono stati utilizzati e ci sono altre 10 attività da elaborare, l'utilizzo della memoria sembra smettere di crescere. Qualcuno può dare una visione di alto livello di ciò che sta accadendo qui in Linux dal punto di vista della gestione della memoria o anche alcuni suggerimenti su dove cercare alcune informazioni di base su questo argomento specifico?

Modifica 1 : ho modificato il grafico seguente per mostrare il valore di output rssdell'esecuzione ps -p <pid> -h -o etimes,pid,rss,vszogni secondo nel contenitore Linux dove si <pid>trova l'id del processo in fase di test. È in ragionevole accordo con l'uscita di docker stats --no-stream --format "{{.MemUsage}}.

Modifica 2 : in base a un commento di seguito relativo agli allocatori STL, ho rimosso la mappa da MWE sostituendo la handle_postfunzione con quanto segue e aggiungendo gli include #include <cstdlib>e #include <cstring>. Ora, la handle_postfunzione alloca e imposta solo la memoria per 500 intKB, che è circa 2 MiB.

void handle_post() {
    
    size_t chunk = 500000 * sizeof(int);
    if (int* p = (int*)malloc(chunk)) {

        memset(p, 1, chunk);
        cout << "Allocated and used " << chunk << " bytes, thread ID: " << this_thread::get_id() << endl;
        cout << "Memory address: " << p << endl;

        unsigned pause = 3;
        cout << "Sleep for " << pause << " seconds." << endl;
        this_thread::sleep_for(chrono::seconds(pause));

        free(p);
    }
}

Ottengo lo stesso comportamento qui. Ho ridotto il numero di thread a 8 e il numero di attività a 10 nell'esempio. Il grafico sotto mostra i risultati.

Modifica 3 : ho aggiunto i risultati dell'esecuzione su una macchina Linux CentOS. Concorda ampiamente con i risultati del risultato dell'immagine docker di Debian.

Edit 4 : basato su un altro commento qui sotto, ho eseguito l'esempio in valgrind's massifstrumento. I massifparametri della riga di comando sono nelle immagini seguenti. L'ho eseguito con --pages-as-heap=yes, seconda immagine sotto e senza questa bandiera, prima immagine sotto. La prima immagine suggerisce che la memoria di ~ 2MiB viene allocata all'heap (condiviso) quando la handle_postfunzione viene eseguita su un thread e quindi liberata quando la funzione esce. Questo è quello che mi aspetterei e quello che osservo su Windows. Non sono ancora sicuro di come interpretare il grafico con --pages-as-heap=yes, cioè la seconda immagine.

Non riesco a conciliare l'output di massifnella prima immagine con il valore di rssdal pscomando mostrato nei grafici sopra. Se eseguo l'immagine Docker e limito la memoria del contenitore a 12 MB utilizzando docker run --rm -it --privileged --memory="12m" --memory-swap="12m" --name=mwe_test cpp_testing:1.0, il contenitore esaurisce la memoria alla settima allocazione e viene ucciso dal sistema operativo. Entro Killedin uscita e quando guardo dmesgvedo Killed process 25709 (cpp_testing) total-vm:529960kB, anon-rss:10268kB, file-rss:2904kB, shmem-rss:0kB. Ciò suggerirebbe che il rssvalore di psriflette accuratamente la memoria (heap) effettivamente utilizzata dal processo mentre lo massifstrumento sta calcolando su cosa dovrebbe basarsi malloc/ newe free/ deletechiama. Questa è solo la mia ipotesi di base da questo test. La mia domanda sarebbe ancora valida, ovvero perché, o sembra che la memoria dell'heap non venga liberata o deallocata quando la handle_postfunzione esce?

Modifica 5 : ho aggiunto sotto un grafico dell'utilizzo della memoria quando si aumenta il numero di thread nel pool di thread da 1 a 4. Lo schema continua mentre si aumenta il numero di thread fino a 10, quindi non ho incluso da 5 a 10 Nota che ho aggiunto una pausa di 5 secondi all'inizio della mainquale è la linea piatta iniziale nel grafico per i primi ~ 5 secondi. Sembra che, indipendentemente dal conteggio dei thread, ci sia un rilascio di memoria dopo che la prima attività è stata elaborata ma che la memoria non viene rilasciata (conservata per il riutilizzo?) Dopo l'attività da 2 a 10. Potrebbe suggerire che alcuni parametri di allocazione della memoria siano regolati durante esecuzione dell'attività 1 (solo pensando ad alta voce!)?

Modifica 6 : in base al suggerimento della risposta dettagliata di seguito , ho impostato la variabile di ambiente MALLOC_ARENA_MAXsu 1 e 2 prima di eseguire l'esempio. Questo fornisce l'output nel grafico seguente. Ciò è come previsto in base alla spiegazione dell'effetto di questa variabile fornita nella risposta.

Risposte

2 BeeOnRope Aug 20 2020 at 03:07

Molti allocatori moderni, incluso quello in glibc 2.17 che stai usando, usano più arene (una struttura che tiene traccia delle regioni di memoria libera) per evitare contese tra thread che vogliono allo stesso tempo allocare.

La memoria liberata in un'arena non è disponibile per essere allocata da un'altra arena (a meno che non venga attivato un qualche tipo di trasferimento cross-arena).

Per impostazione predefinita, glibc allocherà nuove arene ogni volta che un nuovo thread effettua un'allocazione, fino a quando non viene raggiunto un limite predefinito (il cui valore predefinito è 8 * numero di CPU) come puoi vedere esaminando il codice .

Una conseguenza di ciò è che la memoria allocata e poi liberata su un thread potrebbe non essere disponibile per altri thread poiché utilizzano aree separate, anche se quel thread non sta facendo alcun lavoro utile.

Puoi provare a impostare il sintonizzabile glibc malloc glibc.malloc.arena_max su 1per forzare tutti i thread nella stessa arena e vedere se cambia il comportamento che stavi osservando.

Nota che questo ha tutto a che fare con l'allocatore dello spazio utente (in libc) e niente a che fare con l'allocazione della memoria del SO: il SO non viene mai informato che la memoria è stata liberata. Anche se si forza una singola arena, ciò non significa che l'allocatore dello spazio utente deciderà di informare il sistema operativo: potrebbe semplicemente mantenere la memoria in giro per soddisfare una richiesta futura (ci sono parametri per regolare anche questo comportamento).

Tuttavia, nel tuo test l'utilizzo di una singola arena dovrebbe essere sufficiente per evitare il costante aumento dell'impronta di memoria poiché la memoria viene liberata prima dell'inizio del thread successivo e quindi ci aspettiamo che venga riutilizzata dall'attività successiva, che inizia su un thread diverso.

Infine, vale la pena sottolineare che ciò che accade dipende in larga misura dal modo esatto in cui i thread vengono notificati dalla variabile condition: presumibilmente Linux utilizza un comportamento FIFO, dove l'ultimo thread in coda (in attesa) sarà l'ultimo a ricevere la notifica. Questo ti fa scorrere tutti i thread mentre aggiungi attività, causando la creazione di molte arene. Un modello più efficiente (per una serie di motivi) è una politica LIFO: usa il thread accodato più di recente per il lavoro successivo. Ciò causerebbe il riutilizzo ripetuto dello stesso thread nel test e "risolverebbe" il problema.

Nota finale: molti allocatori, ma non il inserito per la versione precedente di glibc che si sta utilizzando, implementano anche una cache per-thread , che permette il percorso di assegnazione rapida di procedere senza alcun operazioni atomiche. Questo può produrre un effetto simile all'uso di più arene e che continua a ridimensionarsi con il numero di thread.