Jak rozpocząć i zatrzymać wątek roboczy
Mam następujące wymaganie, które jest standardowe w innych językach programowania, ale nie wiem, jak to zrobić w Rust.
Mam klasę, chcę napisać metodę odradzania wątku roboczego, który spełniał 2 warunki:
- Po spawnowaniu wątku roboczego funkcja jest zwracana (więc inne miejsce nie musi czekać)
- Istnieje mechanizm zatrzymujący ten wątek.
Na przykład, oto mój fikcyjny kod:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Mam błąd pod adresem thread: JoinHandle<?>
. Jaki jest rodzaj nici w tym przypadku. I czy mój kod jest poprawny, aby uruchamiać i zatrzymywać wątek roboczy?
Odpowiedzi
Krótko mówiąc, funkcja T
in join()na JoinHandleJoinHandle<?>
musiałoby być, JoinHandle<()>
ponieważ twoje zamknięcie nic nie zwraca , tj. ()(Jednostka) .
Poza tym twój fałszywy kod zawiera kilka dodatkowych problemów.
- Zwracany typ
run()
jest niepoprawny i przynajmniej powinien byćResult<(), ()>
. thread
Pola musiałaby byćOption<JoinHandle<()>
, aby móc obsłużyćfn stop(&mut self)
jak join()konsumujeJoinHandle
.- Jednak próbujesz przejść
&mut self
do zamknięcia, które powoduje znacznie więcej problemów, sprowadzając się do wielu zmiennych odwołań- Można to rozwiązać za pomocą np
Mutex<A>
. Jeśli jednak zadzwoniszstop()
, może to zamiast tego doprowadzić do impasu.
- Można to rozwiązać za pomocą np
Jednak ponieważ był to fałszywy kod, co wyjaśniłeś w komentarzach. Spróbuję wyjaśnić, co miałeś na myśli, na kilku przykładach. Obejmuje to mnie przepisanie twojego fikcyjnego kodu.
Wynik po zakończeniu pracy pracownika
Jeśli nie potrzebujesz dostępu do danych, gdy wątek roboczy jest uruchomiony, możesz utworzyć nowy struct WorkerData
. Następnie run()
skopiuj / sklonuj dane, z których potrzebujesz A
(lub tak jak je zmieniłem Worker
). Następnie w zamknięciu w końcu wracasz data
, abyś mógł go zdobyć join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Naprawdę nie musisz thread
być Option<JoinHandle<WorkerData>>
i zamiast tego możesz po prostu użyć JoinHandle<WorkerData>>
. Ponieważ gdybyś chciał zadzwonić run()
ponownie, po prostu łatwiej byłoby ponownie przypisać zmienną zawierającą Worker
.
Więc teraz możemy uprościć Worker
, usuwając Option
i zmieniając zamiast tego, stop
aby konsumować thread
, wraz z tworzeniem new() -> Self
zamiast run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Udostępnione WorkerData
Jeśli chcesz zachować odwołania do WorkerData
wielu wątków, musisz użyć Arc. Ponieważ dodatkowo chcesz mieć możliwość jego mutacji, musisz użyć pliku Mutex.
Jeśli będziesz mutować tylko w jednym wątku, możesz alternatywnie wykonać a RwLock, co w porównaniu z a Mutex
pozwoli ci zablokować i uzyskać wiele niezmiennych referencji w tym samym czasie.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Jeśli dodasz metodę, aby móc uzyskać data
w postaci Arc<RwLock<WorkerData>>
. Następnie, jeśli sklonujesz Arc
i zablokujesz go (wewnętrzną RwLock
) przed wywołaniem stop()
, spowoduje to zakleszczenie. Aby tego uniknąć, każda data()
metoda powinna zwrócić &WorkerData
lub &mut WorkerData
zamiast Arc
. W ten sposób nie będziesz w stanie zadzwonić stop()
i spowodować impasu.
Zgłoś zatrzymanie pracownika
Jeśli faktycznie chcesz zatrzymać wątek roboczy, musisz użyć flagi, aby to zrobić. Możesz stworzyć flagę w formie udostępnionej AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Wiele wątków i wiele zadań
Jeśli chcesz, aby przetwarzano wiele rodzajów zadań, podzielonych na wiele wątków, oto bardziej uogólniony przykład.
Wspomniałeś już o używaniu mpsc. Możesz więc używać Sendera Receiverwraz z niestandardowym Task
i TaskResult
wyliczeniem.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Przykład użycia Worker
wraz z wysyłaniem zadań i otrzymywaniem wyników zadań wyglądałby wtedy następująco:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
W kilku przypadkach może być konieczne wdrożenie Send
dla Task
i / lub TaskResult
. Zobacz „Zrozumienie cechy wysyłania” .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Parametr typu a JoinHandle
powinien być typem zwracanym przez funkcję wątku.
W tym przypadku zwracanym typem jest pusta krotka ()
, jednostka wymawiana . Jest używany, gdy możliwa jest tylko jedna wartość, i jest niejawnym „typem zwracania” funkcji, gdy nie określono żadnego typu zwracanego.
Możesz po prostu napisać, JoinHandle<()>
aby zaznaczyć, że funkcja nic nie zwróci.
(Uwaga: w Twoim kodzie będą występować pewne problemy z narzędziem do sprawdzania wypożyczeń self.call()
, które prawdopodobnie będą wymagały rozwiązania za pomocą Arc<Mutex<Self>>
, ale to już inna kwestia).