Como iniciar e parar um thread de trabalho

Jan 03 2021

Eu tenho o seguinte requisito que é padrão em outras linguagens de programação, mas não sei como fazer no Rust.

Eu tenho uma classe, quero escrever um método para gerar um thread de trabalho que satisfaça 2 condições:

  • Depois de gerar o thread de trabalho, a função retorna (para que outro lugar não precise esperar)
  • Existe um mecanismo para interromper este segmento.

Por exemplo, aqui está meu código fictício:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Eu tenho um erro em thread: JoinHandle<?>. Qual é o tipo de thread neste caso. E meu código está correto para iniciar e interromper um thread de trabalho?

Respostas

3 vallentin Jan 03 2021 at 02:07

Resumindo, o Tin join()em a JoinHandleretorna o resultado do fechamento passado para thread::spawn(). Portanto, no seu caso JoinHandle<?>precisaria ser, JoinHandle<()>pois o fechamento não retorna nada , ou seja, ()(unidade) .

Além disso, seu código fictício contém alguns problemas adicionais.

  • O tipo de retorno de run()está incorreto e deveria pelo menos ser Result<(), ()>.
  • O threadcampo precisa Option<JoinHandle<()>ser capaz de lidar com o fn stop(&mut self) que join()consome JoinHandle.
  • No entanto, você está tentando passar &mut selfpara o encerramento, o que traz muito mais problemas, resumindo-se a várias referências mutáveis
    • Isso poderia ser resolvido com, por exemplo Mutex<A>. No entanto, se você ligar stop(), isso pode levar a um deadlock.

Porém, já que era um código fictício, e você esclareceu nos comentários. Deixe-me tentar esclarecer o que você quis dizer com alguns exemplos. Isso inclui reescrever seu código fictício.

Resultado depois que o trabalhador terminar

Se você não precisa acessar os dados enquanto o thread de trabalho está em execução, você pode fazer um novo struct WorkerData. Em seguida, run()você copia / clona os dados de que precisa A(ou como eu os renomei Worker). Então, no fechamento, você finalmente retorna datanovamente, para que possa adquiri-lo completamente join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Você realmente não precisa threadser Option<JoinHandle<WorkerData>>e, em vez disso, pode apenas usar JoinHandle<WorkerData>>. Porque se você quisesse chamar run()novamente, seria mais fácil reatribuir a variável que contém o Worker.

Portanto, agora podemos simplificar Worker, removendo Optione alterando stoppara consumir thread, juntamente com a criação new() -> Selfno lugar de run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Compartilhado WorkerData

Se você quiser reter as referências WorkerDataentre vários threads, precisará usar Arc. Já que você também deseja poder alterá-lo, você precisará usar um Mutex.

Se você apenas sofrerá mutações em um único thread, poderá alternativamente, você a RwLock, que, em comparação com a Mutex, permitirá que você bloqueie e obtenha várias referências imutáveis ​​ao mesmo tempo.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Se você adicionar um método para poder obter datana forma de Arc<RwLock<WorkerData>>. Então, se você clonar Arce bloquear (o interno RwLock) antes de chamar stop(), isso resultará em um deadlock. Para evitar isso, qualquer data()método deve retornar &WorkerDataou em &mut WorkerDatavez do Arc. Dessa forma, você não conseguiria ligar stop()e causar um deadlock.

Sinalizar para parar o trabalhador

Se você realmente deseja interromper o thread de trabalho, deverá usar um sinalizador para sinalizá-lo. Você pode criar um sinalizador na forma de um compartilhado AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Múltiplos threads e múltiplas tarefas

Se você deseja que vários tipos de tarefas sejam processadas, espalhadas por vários threads, aqui está um exemplo mais generalizado.

Você já mencionou o uso mpsc. Portanto, você pode usar um Sendere Receiverjunto com um custom Taske TaskResultenum.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Um exemplo de uso de Workerjunto com o envio de tarefas e recebimento de resultados de tarefas ficaria assim:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

Em alguns casos, você pode precisar implementar Sendpara Taske / ou TaskResult. Confira "Compreendendo o traço Send" .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

O parâmetro de tipo de a JoinHandledeve ser o tipo de retorno da função do segmento.

Nesse caso, o tipo de retorno é uma tupla vazia (), unidade pronunciada . É usado quando há apenas um valor possível e é o "tipo de retorno" implícito de funções quando nenhum tipo de retorno é especificado.

Você pode apenas escrever JoinHandle<()>para representar que a função não retornará nada.

(Observação: seu código terá alguns problemas com o verificador de empréstimo self.call(), que provavelmente precisarão ser resolvidos Arc<Mutex<Self>>, mas essa é outra questão.)