Comment démarrer et arrêter un thread de travail

Jan 03 2021

J'ai une exigence suivante qui est standard dans d'autres langages de programmation mais je ne sais pas comment faire dans Rust.

J'ai une classe, je veux écrire une méthode pour générer un thread de travail qui remplit 2 conditions:

  • Après avoir engendré le thread de travail, la fonction est de retour (donc l'autre endroit n'a pas besoin d'attendre)
  • Il existe un mécanisme pour arrêter ce thread.

Par exemple, voici mon code factice:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

J'ai une erreur à thread: JoinHandle<?>. Quel est le type de fil dans ce cas. Et mon code est-il correct pour démarrer et arrêter un thread de travail?

Réponses

3 vallentin Jan 03 2021 at 02:07

En bref, Tin join()on a JoinHandlerenvoie le résultat de la fermeture passée à thread::spawn(). Donc, dans votre cas, JoinHandle<?>il faudrait JoinHandle<()>que votre fermeture ne renvoie rien , c'est-à-dire ()(unité) .

En dehors de cela, votre code factice contient quelques problèmes supplémentaires.

  • Le type de retour de run()est incorrect et devrait au moins l'être Result<(), ()>.
  • Le threadchamp devrait Option<JoinHandle<()>être capable de gérer fn stop(&mut self) comme join()consomme le JoinHandle.
  • Cependant, vous essayez de passer &mut selfà la fermeture, ce qui entraîne beaucoup plus de problèmes, se résumant à plusieurs références mutables
    • Cela pourrait être résolu avec par exemple Mutex<A>. Cependant, si vous appelez, stop()cela pourrait conduire à une impasse à la place.

Cependant, puisqu'il s'agissait de code factice, et que vous avez clarifié dans les commentaires. Laissez-moi essayer de clarifier ce que vous vouliez dire avec quelques exemples. Cela inclut la réécriture de votre code factice.

Résultat une fois que le travailleur a terminé

Si vous n'avez pas besoin d'accéder aux données pendant l'exécution du thread de travail, vous pouvez en créer un nouveau struct WorkerData. Ensuite, run()copiez / clonez les données dont vous avez besoin A(ou telles que je les ai renommées Worker). Ensuite, dans la fermeture, vous revenez enfin data, vous pouvez donc l'acquérir à travers join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Vous n'avez pas vraiment besoin threadd'être Option<JoinHandle<WorkerData>>et à la place, vous pouvez simplement utiliser JoinHandle<WorkerData>>. Parce que si vous vouliez appeler à run()nouveau, il serait simplement plus facile de réaffecter la variable contenant le Worker.

Alors maintenant, nous pouvons simplifier Worker, supprimer le Optionet changer stoppour consommer à la threadplace, tout en créant new() -> Selfà la place de run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

partagé WorkerData

Si vous souhaitez conserver les références WorkerDataentre plusieurs threads, vous devez utiliser Arc. Étant donné que vous souhaitez également pouvoir le faire muter, vous devrez utiliser un fichier Mutex.

Si vous ne muter que dans un seul thread, vous pouvez également vous a RwLock, ce qui, par rapport à a Mutex, vous permettra de verrouiller et d'obtenir plusieurs références immuables en même temps.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Si vous ajoutez une méthode pour pouvoir obtenir datasous la forme de Arc<RwLock<WorkerData>>. Ensuite, si vous clonez Arcet verrouillez (l'intérieur RwLock) avant d'appeler stop(), cela entraînerait un blocage. Pour éviter cela, toute data()méthode doit renvoyer &WorkerDataou à la &mut WorkerDataplace du Arc. De cette façon, vous ne pourriez pas appeler stop()et provoquer une impasse.

Drapeau pour arrêter le travailleur

Si vous souhaitez réellement arrêter le thread de travail, vous devez utiliser un indicateur pour lui signaler qu'il le fait. Vous pouvez créer un drapeau sous la forme d'un fichier partagé AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Plusieurs threads et plusieurs tâches

Si vous souhaitez traiter plusieurs types de tâches, réparties sur plusieurs threads, voici un exemple plus généralisé.

Vous avez déjà mentionné l'utilisation de mpsc. Vous pouvez donc utiliser un Senderet Receiveravec un custom Tasket une TaskResultenum.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Un exemple d'utilisation du Workeravec l'envoi de tâches et la réception de résultats de tâches ressemblerait alors à ceci:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

Dans quelques cas, vous devrez peut-être implémenter Sendpour Tasket / ou TaskResult. Consultez «Comprendre le trait d'envoi» .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

Le paramètre type de a JoinHandledoit être le type de retour de la fonction du thread.

Dans ce cas, le type de retour est un tuple vide (), une unité prononcée . Il est utilisé lorsqu'il n'y a qu'une seule valeur possible et est le "type de retour" implicite des fonctions lorsqu'aucun type de retour n'est spécifié.

Vous pouvez simplement écrire JoinHandle<()>pour indiquer que la fonction ne retournera rien.

(Remarque: votre code rencontrera des problèmes de vérificateur d'emprunt avec self.call(), qui devront probablement être résolus avec Arc<Mutex<Self>>, mais c'est une autre question.)