So starten und stoppen Sie einen Worker-Thread
Ich habe eine folgende Anforderung, die in anderen Programmiersprachen Standard ist, aber ich weiß nicht, wie ich in Rust vorgehen soll.
Ich habe eine Klasse und möchte eine Methode schreiben, um einen Arbeitsthread zu erzeugen, der zwei Bedingungen erfüllt:
- Nach dem Laichen des Worker-Threads wird die Funktion zurückgegeben (sodass ein anderer Ort nicht warten muss).
- Es gibt einen Mechanismus zum Stoppen dieses Threads.
Hier ist zum Beispiel mein Dummy-Code:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Ich habe einen Fehler bei thread: JoinHandle<?>
. Was ist die Art des Threads in diesem Fall. Und ist mein Code korrekt, um einen Arbeitsthread zu starten und zu stoppen?
Antworten
Kurz gesagt, das T
In- join()On JoinHandleJoinHandle<?>
müsste es also sein, JoinHandle<()>
da Ihr Verschluss nichts zurückgibt , dh ()(Einheit) .
Abgesehen davon enthält Ihr Dummy-Code einige zusätzliche Probleme.
- Der Rückgabetyp von
run()
ist falsch und müsste es zumindest seinResult<(), ()>
. - Das
thread
Feld müssteOption<JoinHandle<()>
in der Lage sein, mitfn stop(&mut self)
dem join()Verbrauch umzugehenJoinHandle
. - Sie versuchen jedoch,
&mut self
zum Abschluss überzugehen, was viel mehr Probleme mit sich bringt und sich auf mehrere veränderbare Referenzen beschränkt- Dies könnte zB gelöst werden
Mutex<A>
. Wenn Sie jedoch anrufenstop()
, kann dies stattdessen zu einem Deadlock führen.
- Dies könnte zB gelöst werden
Da es sich jedoch um Dummy-Code handelte, haben Sie dies in den Kommentaren klargestellt. Lassen Sie mich versuchen, anhand einiger Beispiele zu verdeutlichen, was Sie gemeint haben. Dazu gehört, dass ich Ihren Dummy-Code neu schreibe.
Ergebnis, nachdem der Arbeiter fertig ist
Wenn Sie keinen Zugriff auf die Daten benötigen, während der Arbeitsthread ausgeführt wird, können Sie einen neuen erstellen struct WorkerData
. Dann run()
kopieren / klonen Sie die Daten, von denen Sie sie benötigen A
(oder wie ich sie umbenannt habe Worker
). Dann kehren Sie im Verschluss endlich wieder data
zurück, damit Sie es durch erwerben können join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Sie müssen nicht wirklich thread
sein Option<JoinHandle<WorkerData>>
und könnten stattdessen nur verwenden JoinHandle<WorkerData>>
. Denn wenn Sie run()
erneut aufrufen möchten, ist es einfacher, die Variable, die das enthält, neu zuzuweisen Worker
.
So , jetzt können wir vereinfachen Worker
, das Entfernen Option
und verändern stop
zu konsumieren thread
statt, zusammen mit der Erstellung new() -> Self
statt run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Geteilt WorkerData
Wenn Sie Verweise auf WorkerData
mehrere Threads beibehalten möchten, müssen Sie diese verwenden Arc. Da Sie es zusätzlich mutieren möchten, müssen Sie a verwenden Mutex.
Wenn Sie nur innerhalb eines einzelnen Threads mutieren, können Sie alternativ auch a verwenden RwLock, wodurch Sie im Vergleich zu a Mutex
mehrere unveränderliche Referenzen gleichzeitig sperren und abrufen können.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Wenn Sie eine Methode hinzufügen, um data
in Form von erhalten zu können Arc<RwLock<WorkerData>>
. Wenn Sie dann das klonen Arc
und es (das Innere RwLock
) vor dem Aufruf sperren stop()
, würde dies zu einem Deadlock führen. Um dies zu vermeiden, sollte jede data()
Methode &WorkerData
oder &mut WorkerData
anstelle von zurückgeben Arc
. Auf diese Weise können Sie nicht anrufen stop()
und einen Deadlock verursachen.
Flagge, um Arbeiter zu stoppen
Wenn Sie den Worker-Thread tatsächlich stoppen möchten, müssen Sie ein Flag verwenden, um dies zu signalisieren. Sie können ein Flag in Form einer Freigabe erstellen AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Mehrere Threads und mehrere Aufgaben
Wenn Sie mehrere Arten von Aufgaben verarbeiten möchten, die auf mehrere Threads verteilt sind, finden Sie hier ein allgemeineres Beispiel.
Sie haben bereits die Verwendung erwähnt mpsc. Sie können also ein Senderund Receiverzusammen mit einem Brauch Task
und einer TaskResult
Aufzählung verwenden.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Ein Beispiel für die Verwendung von Worker
Aufgaben zusammen mit dem Senden und Empfangen von Aufgabenergebnissen würde dann folgendermaßen aussehen:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
In einigen Fällen müssen Sie möglicherweise Send
für Task
und / oder implementieren TaskResult
. Lesen Sie "Grundlegendes zum Senden" .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Der Typparameter von a JoinHandle
sollte der Rückgabetyp der Thread-Funktion sein.
In diesem Fall ist der Rückgabetyp eine leere ()
, ausgesprochene Tupeleinheit . Es wird verwendet, wenn nur ein Wert möglich ist, und ist der implizite "Rückgabetyp" von Funktionen, wenn kein Rückgabetyp angegeben ist.
Sie können einfach schreiben, JoinHandle<()>
um darzustellen, dass die Funktion nichts zurückgibt.
(Hinweis: Bei Ihrem Code self.call()
treten einige Probleme mit dem Leihprüfer auf, mit denen wahrscheinlich Probleme gelöst werden müssen Arc<Mutex<Self>>
, aber das ist eine andere Frage.)