Windows vs Linux - Utilisation de la mémoire du pool de threads C ++

Aug 17 2020

J'ai examiné l'utilisation de la mémoire de certains frameworks d'API REST C ++ sous Windows et Linux (Debian). En particulier, j'ai examiné ces deux frameworks: cpprestsdk et cpp-httplib . Dans les deux cas, un pool de threads est créé et utilisé pour traiter les demandes.

J'ai pris l'implémentation du pool de threads de cpp-httplib et l' ai mise dans un exemple de travail minimal ci-dessous, pour montrer l'utilisation de la mémoire que j'observe sous Windows et Linux.

#include <cassert>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

using namespace std;

// TaskQueue and ThreadPool taken from https://github.com/yhirose/cpp-httplib
class TaskQueue {
public:
    TaskQueue() = default;
    virtual ~TaskQueue() = default;

    virtual void enqueue(std::function<void()> fn) = 0;
    virtual void shutdown() = 0;

    virtual void on_idle() {};
};

class ThreadPool : public TaskQueue {
public:
    explicit ThreadPool(size_t n) : shutdown_(false) {
        while (n) {
            threads_.emplace_back(worker(*this));
            cout << "Thread number " << threads_.size() + 1 << " has ID " << threads_.back().get_id() << endl;
            n--;
        }
    }

    ThreadPool(const ThreadPool&) = delete;
    ~ThreadPool() override = default;

    void enqueue(std::function<void()> fn) override {
        std::unique_lock<std::mutex> lock(mutex_);
        jobs_.push_back(fn);
        cond_.notify_one();
    }

    void shutdown() override {
        // Stop all worker threads...
        {
            std::unique_lock<std::mutex> lock(mutex_);
            shutdown_ = true;
        }

        cond_.notify_all();

        // Join...
        for (auto& t : threads_) {
            t.join();
        }
    }

private:
    struct worker {
        explicit worker(ThreadPool& pool) : pool_(pool) {}

        void operator()() {
            for (;;) {
                std::function<void()> fn;
                {
                    std::unique_lock<std::mutex> lock(pool_.mutex_);

                    pool_.cond_.wait(
                        lock, [&] { return !pool_.jobs_.empty() || pool_.shutdown_; });

                    if (pool_.shutdown_ && pool_.jobs_.empty()) { break; }

                    fn = pool_.jobs_.front();
                    pool_.jobs_.pop_front();
                }

                assert(true == static_cast<bool>(fn));
                fn();
            }
        }

        ThreadPool& pool_;
    };
    friend struct worker;

    std::vector<std::thread> threads_;
    std::list<std::function<void()>> jobs_;

    bool shutdown_;

    std::condition_variable cond_;
    std::mutex mutex_;
};

// MWE
class ContainerWrapper {
public:
    ~ContainerWrapper() {
        cout << "Destructor: data map is of size " << data.size() << endl;
    }

    map<pair<string, string>, double> data;
};

void handle_post() {
    
    cout << "Start adding data, thread ID: " << std::this_thread::get_id() << endl;

    ContainerWrapper cw;
    for (size_t i = 0; i < 5000; ++i) {
        string date = "2020-08-11";
        string id = "xxxxx_" + std::to_string(i);
        double value = 1.5;
        cw.data[make_pair(date, id)] = value;
    }

    cout << "Data map is now of size " << cw.data.size() << endl;

    unsigned pause = 3;
    cout << "Sleep for " << pause << " seconds." << endl;
    std::this_thread::sleep_for(std::chrono::seconds(pause));
}

int main(int argc, char* argv[]) {

    cout << "ID of main thread: " << std::this_thread::get_id() << endl;

    std::unique_ptr<TaskQueue> task_queue(new ThreadPool(40));

    for (size_t i = 0; i < 50; ++i) {
        
        cout << "Add task number: " << i + 1 << endl;
        task_queue->enqueue([]() { handle_post(); });

        // Sleep enough time for the task to finish.
        std::this_thread::sleep_for(std::chrono::seconds(5));
    }

    task_queue->shutdown();

    return 0;
}

Lorsque j'exécute ce MWE et que je regarde la consommation de mémoire sous Windows par rapport à Linux, j'obtiens le graphique ci-dessous. Pour Windows, j'avais l'habitude perfmond'obtenir la valeur Private Bytes . Sous Linux, j'avais l'habitude docker stats --no-stream --format "{{.MemUsage}}de consigner l'utilisation de la mémoire du conteneur. Cela correspondait resau processus d' topexécution à l'intérieur du conteneur. Il ressort du graphique que lorsqu'un thread alloue de la mémoire pour la mapvariable sous Windows dans la handle_postfonction, que la mémoire est renduelorsque la fonction se termine avant le prochain appel de la fonction. C'était le type de comportement auquel je m'attendais naïvement. Je n'ai aucune expérience concernant la façon dont le système d'exploitation traite la mémoire allouée par une fonction qui est exécutée dans un thread lorsque le thread reste en vie, c'est-à-dire comme ici dans un pool de threads. Sous Linux, il semble que l'utilisation de la mémoire ne cesse de croître et que la mémoire n'est pas restituée lorsque la fonction se termine. Lorsque les 40 threads ont été utilisés et qu'il reste 10 tâches à traiter, l'utilisation de la mémoire semble cesser d'augmenter. Quelqu'un peut-il donner une vue de haut niveau de ce qui se passe ici sous Linux du point de vue de la gestion de la mémoire ou même des conseils pour savoir où chercher des informations de base sur ce sujet spécifique?

Edit 1 : J'ai édité le graphique ci-dessous pour afficher la valeur de sortie de rssde l'exécution ps -p <pid> -h -o etimes,pid,rss,vsztoutes les secondes dans le conteneur Linux où <pid>est l'id du processus testé. Il est en accord raisonnable avec la sortie de docker stats --no-stream --format "{{.MemUsage}}.

Edit 2 : Sur la base d'un commentaire ci-dessous concernant les allocateurs STL, j'ai supprimé la carte de MWE en remplaçant la handle_postfonction par ce qui suit et en ajoutant les includes #include <cstdlib>et #include <cstring>. Maintenant, la handle_postfonction alloue et définit simplement la mémoire pour 500K ints, soit environ 2 Mo.

void handle_post() {
    
    size_t chunk = 500000 * sizeof(int);
    if (int* p = (int*)malloc(chunk)) {

        memset(p, 1, chunk);
        cout << "Allocated and used " << chunk << " bytes, thread ID: " << this_thread::get_id() << endl;
        cout << "Memory address: " << p << endl;

        unsigned pause = 3;
        cout << "Sleep for " << pause << " seconds." << endl;
        this_thread::sleep_for(chrono::seconds(pause));

        free(p);
    }
}

J'ai le même comportement ici. J'ai réduit le nombre de threads à 8 et le nombre de tâches à 10 dans l'exemple. Le graphique ci-dessous montre les résultats.

Edit 3 : J'ai ajouté les résultats de l'exécution sur une machine Linux CentOS. Il est globalement en accord avec les résultats du résultat de l'image du docker Debian.

Edit 4 : Basé sur un autre commentaire ci-dessous, j'ai exécuté l'exemple sous valgrindl' massifoutil de. Les massifparamètres de ligne de commande sont dans les images ci-dessous. Je l'ai couru avec --pages-as-heap=yes, deuxième image ci-dessous, et sans ce drapeau, première image ci-dessous. La première image suggère que ~ 2MiB de mémoire est allouée au tas (partagé) lorsque la handle_postfonction est exécutée sur un thread, puis libérée lorsque la fonction se termine. C'est ce à quoi je m'attendrais et ce que j'observe sous Windows. Je ne sais pas encore comment interpréter le graphique avec --pages-as-heap=yes, c'est-à-dire la deuxième image.

Je ne peux pas réconcilier la sortie de massifdans la première image avec la valeur de rssde la pscommande indiquée dans les graphiques ci-dessus. Si j'exécute l'image Docker et limite la mémoire du conteneur à 12 Mo en utilisant docker run --rm -it --privileged --memory="12m" --memory-swap="12m" --name=mwe_test cpp_testing:1.0, le conteneur manque de mémoire lors de la 7ème allocation et est tué par le système d'exploitation. Je rentre Killeddans la sortie et quand je regarde dmesg, je vois Killed process 25709 (cpp_testing) total-vm:529960kB, anon-rss:10268kB, file-rss:2904kB, shmem-rss:0kB. Cela suggérerait que la rssvaleur de psreflète précisément la mémoire (du tas) réellement utilisée par le processus, tandis que l' massifoutil calcule ce sur quoi il doit être basé sur malloc/ newet free/ delete. Ce n'est que mon hypothèse de base à partir de ce test. Ma question serait toujours valable, à savoir pourquoi, ou semble-t-il que, la mémoire du tas n'est pas libérée ou désallouée lorsque la handle_postfonction se termine?

Edit 5 : J'ai ajouté ci-dessous un graphique de l'utilisation de la mémoire à mesure que vous augmentez le nombre de threads dans le pool de threads de 1 à 4. Le modèle continue lorsque vous augmentez le nombre de threads jusqu'à 10, donc je n'ai pas inclus 5 à 10 Notez que j'ai ajouté une pause de 5 secondes au début de mainlaquelle se trouve la ligne plate initiale du graphique pour les ~ 5 premières secondes. Il semble que, quel que soit le nombre de threads, il y a une libération de mémoire après le traitement de la première tâche, mais que la mémoire n'est pas libérée (conservée pour être réutilisée?) Après la tâche 2 à 10. Cela peut suggérer qu'un paramètre d'allocation de mémoire est réglé pendant exécution de la tâche 1 (penser à voix haute!)?

Edit 6 : Sur la base de la suggestion de la réponse détaillée ci - dessous , j'ai défini la variable d'environnement MALLOC_ARENA_MAXsur 1 et 2 avant d'exécuter l'exemple. Cela donne la sortie dans le graphique suivant. C'est comme prévu d'après l'explication de l'effet de cette variable donnée dans la réponse.

Réponses

2 BeeOnRope Aug 20 2020 at 03:07

De nombreux allocateurs modernes, y compris celui de la glibc 2.17 que vous utilisez, utilisent plusieurs arènes (une structure qui suit les régions de mémoire libres) afin d'éviter les conflits entre les threads qui souhaitent allouer en même temps.

La mémoire libérée dans une arène n'est pas disponible pour être allouée par une autre arène (à moins qu'un type de transfert inter-arène ne soit déclenché).

Par défaut, la glibc allouera de nouvelles arènes à chaque fois qu'un nouveau thread effectuera une allocation, jusqu'à ce qu'une limite prédéfinie soit atteinte (qui par défaut est 8 * nombre de processeurs) comme vous pouvez le voir en examinant le code .

Une conséquence de ceci est que la mémoire allouée puis libérée sur un thread peut ne pas être disponible pour les autres threads car ils utilisent des zones séparées, même si ce thread ne fait aucun travail utile.

Vous pouvez essayer de régler la glibc malloc accordable glibc.malloc.arena_max à 1afin de forcer toutes les discussions sur la même arène et voir si elle change le comportement que vous observiez.

Notez que cela a tout à voir avec l'allocateur d'espace utilisateur (dans la libc) et rien à voir avec l'allocation de mémoire du système d'exploitation: le système d'exploitation n'est jamais informé que la mémoire a été libérée. Même si vous forcez une seule arène, cela ne signifie pas que l'allocateur d'espace utilisateur décidera d'informer le système d'exploitation: il peut simplement conserver la mémoire pour satisfaire une demande future (il existe des paramètres réglables pour ajuster ce comportement également).

Cependant, dans votre test, l'utilisation d'une seule arène devrait être suffisante pour éviter l'augmentation constante de l'encombrement de la mémoire puisque la mémoire est libérée avant le démarrage du thread suivant, et nous nous attendons donc à ce qu'elle soit réutilisée par la tâche suivante, qui démarre sur un autre thread.

Enfin, il convient de souligner que ce qui se passe dépend fortement de la manière exacte dont les threads sont notifiés par la variable de condition: vraisemblablement, Linux utilise un comportement FIFO, où le thread le plus récemment mis en file d'attente (en attente) sera le dernier à être notifié. Cela vous amène à parcourir tous les threads lorsque vous ajoutez des tâches, ce qui entraîne la création de nombreuses arènes. Un modèle plus efficace (pour diverses raisons) est une politique LIFO: utilisez le thread le plus récemment mis en file d'attente pour le travail suivant. Cela entraînerait la réutilisation répétée du même thread dans votre test et "résoudre" le problème.

Note finale: de nombreux allocateurs, mais pas le on dans l'ancienne version de la glibc que vous utilisez, implémentent également un cache par thread qui permet au chemin rapide d'allocation de se dérouler sans aucune opération atomique. Cela peut produire un effet similaire à l'utilisation de plusieurs arènes, et qui continue à évoluer avec le nombre de threads.