Comment démarrer et arrêter un thread de travail
J'ai une exigence suivante qui est standard dans d'autres langages de programmation mais je ne sais pas comment faire dans Rust.
J'ai une classe, je veux écrire une méthode pour générer un thread de travail qui remplit 2 conditions:
- Après avoir engendré le thread de travail, la fonction est de retour (donc l'autre endroit n'a pas besoin d'attendre)
- Il existe un mécanisme pour arrêter ce thread.
Par exemple, voici mon code factice:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
J'ai une erreur à thread: JoinHandle<?>
. Quel est le type de fil dans ce cas. Et mon code est-il correct pour démarrer et arrêter un thread de travail?
Réponses
En bref, T
in join()on a JoinHandleJoinHandle<?>
il faudrait JoinHandle<()>
que votre fermeture ne renvoie rien , c'est-à-dire ()(unité) .
En dehors de cela, votre code factice contient quelques problèmes supplémentaires.
- Le type de retour de
run()
est incorrect et devrait au moins l'êtreResult<(), ()>
. - Le
thread
champ devraitOption<JoinHandle<()>
être capable de gérerfn stop(&mut self)
comme join()consomme leJoinHandle
. - Cependant, vous essayez de passer
&mut self
à la fermeture, ce qui entraîne beaucoup plus de problèmes, se résumant à plusieurs références mutables- Cela pourrait être résolu avec par exemple
Mutex<A>
. Cependant, si vous appelez,stop()
cela pourrait conduire à une impasse à la place.
- Cela pourrait être résolu avec par exemple
Cependant, puisqu'il s'agissait de code factice, et que vous avez clarifié dans les commentaires. Laissez-moi essayer de clarifier ce que vous vouliez dire avec quelques exemples. Cela inclut la réécriture de votre code factice.
Résultat une fois que le travailleur a terminé
Si vous n'avez pas besoin d'accéder aux données pendant l'exécution du thread de travail, vous pouvez en créer un nouveau struct WorkerData
. Ensuite, run()
copiez / clonez les données dont vous avez besoin A
(ou telles que je les ai renommées Worker
). Ensuite, dans la fermeture, vous revenez enfin data
, vous pouvez donc l'acquérir à travers join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Vous n'avez pas vraiment besoin thread
d'être Option<JoinHandle<WorkerData>>
et à la place, vous pouvez simplement utiliser JoinHandle<WorkerData>>
. Parce que si vous vouliez appeler à run()
nouveau, il serait simplement plus facile de réaffecter la variable contenant le Worker
.
Alors maintenant, nous pouvons simplifier Worker
, supprimer le Option
et changer stop
pour consommer à la thread
place, tout en créant new() -> Self
à la place de run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
partagé WorkerData
Si vous souhaitez conserver les références WorkerData
entre plusieurs threads, vous devez utiliser Arc. Étant donné que vous souhaitez également pouvoir le faire muter, vous devrez utiliser un fichier Mutex.
Si vous ne muter que dans un seul thread, vous pouvez également vous a RwLock, ce qui, par rapport à a Mutex
, vous permettra de verrouiller et d'obtenir plusieurs références immuables en même temps.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Si vous ajoutez une méthode pour pouvoir obtenir data
sous la forme de Arc<RwLock<WorkerData>>
. Ensuite, si vous clonez Arc
et verrouillez (l'intérieur RwLock
) avant d'appeler stop()
, cela entraînerait un blocage. Pour éviter cela, toute data()
méthode doit renvoyer &WorkerData
ou à la &mut WorkerData
place du Arc
. De cette façon, vous ne pourriez pas appeler stop()
et provoquer une impasse.
Drapeau pour arrêter le travailleur
Si vous souhaitez réellement arrêter le thread de travail, vous devez utiliser un indicateur pour lui signaler qu'il le fait. Vous pouvez créer un drapeau sous la forme d'un fichier partagé AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Plusieurs threads et plusieurs tâches
Si vous souhaitez traiter plusieurs types de tâches, réparties sur plusieurs threads, voici un exemple plus généralisé.
Vous avez déjà mentionné l'utilisation de mpsc. Vous pouvez donc utiliser un Senderet Receiveravec un custom Task
et une TaskResult
enum.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Un exemple d'utilisation du Worker
avec l'envoi de tâches et la réception de résultats de tâches ressemblerait alors à ceci:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
Dans quelques cas, vous devrez peut-être implémenter Send
pour Task
et / ou TaskResult
. Consultez «Comprendre le trait d'envoi» .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Le paramètre type de a JoinHandle
doit être le type de retour de la fonction du thread.
Dans ce cas, le type de retour est un tuple vide ()
, une unité prononcée . Il est utilisé lorsqu'il n'y a qu'une seule valeur possible et est le "type de retour" implicite des fonctions lorsqu'aucun type de retour n'est spécifié.
Vous pouvez simplement écrire JoinHandle<()>
pour indiquer que la fonction ne retournera rien.
(Remarque: votre code rencontrera des problèmes de vérificateur d'emprunt avec self.call()
, qui devront probablement être résolus avec Arc<Mutex<Self>>
, mais c'est une autre question.)