Cómo iniciar y detener un hilo de trabajo
Tengo un siguiente requisito que es estándar en otros lenguajes de programación, pero no sé cómo hacerlo en Rust.
Tengo una clase, quiero escribir un método para generar un hilo de trabajo que cumpla con 2 condiciones:
- Después de generar el hilo de trabajo, la función se devuelve (por lo que en otro lugar no es necesario esperar)
- Existe un mecanismo para detener este hilo.
Por ejemplo, aquí está mi código ficticio:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Tengo un error en thread: JoinHandle<?>
. ¿Cuál es el tipo de hilo en este caso? ¿Y mi código es correcto para iniciar y detener un hilo de trabajo?
Respuestas
En resumen, el T
in join()on a JoinHandleJoinHandle<?>
, debería ser JoinHandle<()>
ya que su cierre no devuelve nada , es decir, ()(unidad) .
Aparte de eso, su código ficticio contiene algunos problemas adicionales.
- El tipo de retorno de
run()
es incorrecto y, al menos, debería serloResult<(), ()>
. - El
thread
campo deberíaOption<JoinHandle<()>
poder manejarfn stop(&mut self)
como join()consume elJoinHandle
. - Sin embargo, está intentando pasar
&mut self
al cierre, lo que trae muchos más problemas, reduciéndose a múltiples referencias mutables.- Esto podría resolverse con, por ejemplo
Mutex<A>
. Sin embargo, si llamastop()
, eso podría conducir a un punto muerto.
- Esto podría resolverse con, por ejemplo
Sin embargo, dado que era un código ficticio y lo aclaró en los comentarios. Déjame intentar aclarar lo que quisiste decir con algunos ejemplos. Esto me incluye reescribir su código ficticio.
Resultado después de que el trabajador haya terminado
Si no necesita acceder a los datos mientras se ejecuta el subproceso de trabajo, puede crear un nuevo struct WorkerData
. Luego run()
, copia / clona los datos que necesitas A
(o como lo renombré Worker
). Luego, en el cierre, finalmente regresa de data
nuevo, para que pueda adquirirlo a través join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Realmente no necesita thread
serlo Option<JoinHandle<WorkerData>>
y en su lugar podría usar JoinHandle<WorkerData>>
. Porque si quisiera run()
volver a llamar , sería más fácil reasignar la variable que contiene el Worker
.
Así que ahora podemos simplificar Worker
, eliminar Option
y cambiar stop
a consumir en su thread
lugar, junto con crear new() -> Self
en lugar de run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Compartido WorkerData
Si desea conservar referencias WorkerData
entre varios subprocesos, deberá usar Arc. Dado que también desea poder mutarlo, deberá usar un archivo Mutex.
Si solo va a mutar dentro de un solo hilo, entonces podría alternativamente usar a RwLock, que en comparación con a Mutex
le permitirá bloquear y obtener múltiples referencias inmutables al mismo tiempo.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Si agrega un método para poder obtener data
en forma de Arc<RwLock<WorkerData>>
. Luego, si clona el Arc
y lo bloquea (el interno RwLock
) antes de llamar stop()
, eso resultaría en un punto muerto. Para evitar eso, cualquier data()
método debe devolver &WorkerData
o en &mut WorkerData
lugar de Arc
. De esa forma, no podrá llamar stop()
y provocar un punto muerto.
Bandera para detener trabajador
Si realmente desea detener el hilo de trabajo, entonces tendrá que usar una bandera para indicarle que lo haga. Puede crear una bandera en forma de archivo compartido AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Múltiples hilos y múltiples tareas
Si desea que se procesen varios tipos de tareas, distribuidas en varios subprocesos, aquí hay un ejemplo más generalizado.
Ya mencionaste usar mpsc. Por lo tanto, puede usar un Sendery Receiverjunto con un personalizado Task
y una TaskResult
enumeración.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Un ejemplo del uso de Worker
junto con el envío de tareas y la recepción de resultados de tareas se vería así:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
En algunos casos, es posible que deba implementar Send
for Task
y / o TaskResult
. Consulte "Comprensión del rasgo de envío" .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
El parámetro de tipo de a JoinHandle
debe ser el tipo de retorno de la función del hilo.
En este caso, el tipo de retorno es una tupla vacía ()
, unidad pronunciada . Se utiliza cuando solo hay un valor posible y es el "tipo de retorno" implícito de las funciones cuando no se especifica ningún tipo de retorno.
Puede escribir JoinHandle<()>
para representar que la función no devolverá nada.
(Nota: su código se encontrará con algunos problemas con el comprobador de préstamos self.call()
, que probablemente deberán resolverse Arc<Mutex<Self>>
, pero esa es otra pregunta).