Cómo iniciar y detener un hilo de trabajo

Jan 03 2021

Tengo un siguiente requisito que es estándar en otros lenguajes de programación, pero no sé cómo hacerlo en Rust.

Tengo una clase, quiero escribir un método para generar un hilo de trabajo que cumpla con 2 condiciones:

  • Después de generar el hilo de trabajo, la función se devuelve (por lo que en otro lugar no es necesario esperar)
  • Existe un mecanismo para detener este hilo.

Por ejemplo, aquí está mi código ficticio:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Tengo un error en thread: JoinHandle<?>. ¿Cuál es el tipo de hilo en este caso? ¿Y mi código es correcto para iniciar y detener un hilo de trabajo?

Respuestas

3 vallentin Jan 03 2021 at 02:07

En resumen, el Tin join()on a JoinHandledevuelve el resultado del cierre al que se pasó thread::spawn(). Entonces, en su caso JoinHandle<?>, debería ser JoinHandle<()>ya que su cierre no devuelve nada , es decir, ()(unidad) .

Aparte de eso, su código ficticio contiene algunos problemas adicionales.

  • El tipo de retorno de run()es incorrecto y, al menos, debería serlo Result<(), ()>.
  • El threadcampo debería Option<JoinHandle<()>poder manejar fn stop(&mut self) como join()consume el JoinHandle.
  • Sin embargo, está intentando pasar &mut selfal cierre, lo que trae muchos más problemas, reduciéndose a múltiples referencias mutables.
    • Esto podría resolverse con, por ejemplo Mutex<A>. Sin embargo, si llama stop(), eso podría conducir a un punto muerto.

Sin embargo, dado que era un código ficticio y lo aclaró en los comentarios. Déjame intentar aclarar lo que quisiste decir con algunos ejemplos. Esto me incluye reescribir su código ficticio.

Resultado después de que el trabajador haya terminado

Si no necesita acceder a los datos mientras se ejecuta el subproceso de trabajo, puede crear un nuevo struct WorkerData. Luego run(), copia / clona los datos que necesitas A(o como lo renombré Worker). Luego, en el cierre, finalmente regresa de datanuevo, para que pueda adquirirlo a través join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Realmente no necesita threadserlo Option<JoinHandle<WorkerData>>y en su lugar podría usar JoinHandle<WorkerData>>. Porque si quisiera run()volver a llamar , sería más fácil reasignar la variable que contiene el Worker.

Así que ahora podemos simplificar Worker, eliminar Optiony cambiar stopa consumir en su threadlugar, junto con crear new() -> Selfen lugar de run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Compartido WorkerData

Si desea conservar referencias WorkerDataentre varios subprocesos, deberá usar Arc. Dado que también desea poder mutarlo, deberá usar un archivo Mutex.

Si solo va a mutar dentro de un solo hilo, entonces podría alternativamente usar a RwLock, que en comparación con a Mutexle permitirá bloquear y obtener múltiples referencias inmutables al mismo tiempo.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Si agrega un método para poder obtener dataen forma de Arc<RwLock<WorkerData>>. Luego, si clona el Arcy lo bloquea (el interno RwLock) antes de llamar stop(), eso resultaría en un punto muerto. Para evitar eso, cualquier data()método debe devolver &WorkerDatao en &mut WorkerDatalugar de Arc. De esa forma, no podrá llamar stop()y provocar un punto muerto.

Bandera para detener trabajador

Si realmente desea detener el hilo de trabajo, entonces tendrá que usar una bandera para indicarle que lo haga. Puede crear una bandera en forma de archivo compartido AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Múltiples hilos y múltiples tareas

Si desea que se procesen varios tipos de tareas, distribuidas en varios subprocesos, aquí hay un ejemplo más generalizado.

Ya mencionaste usar mpsc. Por lo tanto, puede usar un Sendery Receiverjunto con un personalizado Tasky una TaskResultenumeración.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Un ejemplo del uso de Workerjunto con el envío de tareas y la recepción de resultados de tareas se vería así:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

En algunos casos, es posible que deba implementar Sendfor Tasky / o TaskResult. Consulte "Comprensión del rasgo de envío" .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

El parámetro de tipo de a JoinHandledebe ser el tipo de retorno de la función del hilo.

En este caso, el tipo de retorno es una tupla vacía (), unidad pronunciada . Se utiliza cuando solo hay un valor posible y es el "tipo de retorno" implícito de las funciones cuando no se especifica ningún tipo de retorno.

Puede escribir JoinHandle<()>para representar que la función no devolverá nada.

(Nota: su código se encontrará con algunos problemas con el comprobador de préstamos self.call(), que probablemente deberán resolverse Arc<Mutex<Self>>, pero esa es otra pregunta).