Как запустить и остановить рабочий поток
У меня есть следующее требование, стандартное для других языков программирования, но я не знаю, как это сделать в Rust.
У меня есть класс, я хочу написать метод для создания рабочего потока, удовлетворяющего 2 условиям:
- После создания рабочего потока функция возвращается (так что в другом месте ждать не нужно)
- Есть механизм остановки этого потока.
Например, вот мой фиктивный код:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
У меня ошибка thread: JoinHandle<?>
. Какой тип резьбы в данном случае. И правильный ли мой код для запуска и остановки рабочего потока?
Ответы
Короче говоря, T
in join()on a JoinHandleJoinHandle<?>
это должно быть, JoinHandle<()>
поскольку ваше закрытие ничего не возвращает , т.е. ()(единица) .
Помимо этого, ваш фиктивный код содержит несколько дополнительных проблем.
- Тип возврата
run()
неверен, по крайней мере, должен бытьResult<(), ()>
. thread
Поле необходимо будетOption<JoinHandle<()>
иметь возможность обрабатыватьfn stop(&mut self)
, как join()пожираетJoinHandle
.- Однако вы пытаетесь перейти
&mut self
к закрытию, что вызывает гораздо больше проблем, сводящихся к нескольким изменяемым ссылкам.- Это можно решить, например, с помощью
Mutex<A>
. Однако, если вы позвоните,stop()
это может привести к тупиковой ситуации.
- Это можно решить, например, с помощью
Однако, поскольку это был фиктивный код, и вы уточнили его в комментариях. Позвольте мне попытаться пояснить, что вы имели в виду, на нескольких примерах. Это включает в себя переписывание вашего фиктивного кода.
Результат после того, как рабочий закончен
Если вам не нужен доступ к данным во время работы рабочего потока, вы можете создать новый struct WorkerData
. Затем run()
вы копируете / клонируете нужные вам данные A
(или как я их переименовал Worker
). Затем в закрытии вы, наконец, data
снова возвращаетесь , чтобы вы могли получить это через join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Вам на самом деле не нужно thread
быть, Option<JoinHandle<WorkerData>>
и вместо этого вы можете просто использовать JoinHandle<WorkerData>>
. Потому что, если бы вы захотели вызвать run()
снова, было бы проще переназначить переменную, содержащую Worker
.
Итак, теперь мы можем упростить Worker
, удалив Option
и изменить, stop
чтобы потреблять thread
, а также создать new() -> Self
вместо run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Общий WorkerData
Если вы хотите сохранить ссылки WorkerData
между несколькими потоками, вам нужно будет использовать Arc. Поскольку вы дополнительно хотите иметь возможность его видоизменять, вам необходимо использовать файл Mutex.
Если вы будете изменять только в одном потоке, то в качестве альтернативы вы можете использовать a RwLock, который по сравнению с a Mutex
позволит вам блокировать и получать несколько неизменяемых ссылок одновременно.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Если вы добавите метод, который можно будет получить data
в виде Arc<RwLock<WorkerData>>
. Затем, если вы клонируете Arc
и заблокируете его (внутренний RwLock
) перед вызовом stop()
, это приведет к тупиковой ситуации. Чтобы этого избежать, любой data()
метод должен возвращать &WorkerData
или &mut WorkerData
вместо Arc
. Таким образом, вы не сможете позвонить stop()
и вызвать тупик.
Отметить, чтобы остановить работника
Если вы действительно хотите остановить рабочий поток, вам нужно будет использовать флаг, чтобы сообщить ему об этом. Вы можете создать флаг в виде расшаренного AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Несколько потоков и несколько задач
Если вы хотите, чтобы несколько типов задач обрабатывались и распределялись по нескольким потокам, то вот более общий пример.
Вы уже упоминали об использовании mpsc. Таким образом, вы можете использовать Senderи Receiverвместе с custom Task
и TaskResult
enum.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Пример использования Worker
вместе с отправкой задач и получением результатов задач будет выглядеть следующим образом:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
В некоторых случаях вам может потребоваться реализовать Send
for Task
и / или TaskResult
. Ознакомьтесь с разделом «Понимание особенности отправки» .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Параметр типа JoinHandle
должен быть типом, возвращаемым функцией потока.
В этом случае возвращаемый тип - пустой кортеж ()
, произносится как unit . Он используется, когда возможно только одно значение, и является неявным «типом возврата» функций, когда тип возврата не указан.
Вы можете просто написать, JoinHandle<()>
чтобы представить, что функция ничего не вернет.
(Примечание: ваш код столкнется с некоторыми проблемами с проверкой заимствований self.call()
, которые, вероятно, придется решить Arc<Mutex<Self>>
, но это другой вопрос.)