Как запустить и остановить рабочий поток

Jan 03 2021

У меня есть следующее требование, стандартное для других языков программирования, но я не знаю, как это сделать в Rust.

У меня есть класс, я хочу написать метод для создания рабочего потока, удовлетворяющего 2 условиям:

  • После создания рабочего потока функция возвращается (так что в другом месте ждать не нужно)
  • Есть механизм остановки этого потока.

Например, вот мой фиктивный код:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

У меня ошибка thread: JoinHandle<?>. Какой тип резьбы в данном случае. И правильный ли мой код для запуска и остановки рабочего потока?

Ответы

3 vallentin Jan 03 2021 at 02:07

Короче говоря, Tin join()on a JoinHandleвозвращает результат закрытия, которому он был передан thread::spawn(). Итак, в вашем случае JoinHandle<?>это должно быть, JoinHandle<()>поскольку ваше закрытие ничего не возвращает , т.е. ()(единица) .

Помимо этого, ваш фиктивный код содержит несколько дополнительных проблем.

  • Тип возврата run()неверен, по крайней мере, должен быть Result<(), ()>.
  • threadПоле необходимо будет Option<JoinHandle<()>иметь возможность обрабатывать fn stop(&mut self) , как join()пожирает JoinHandle.
  • Однако вы пытаетесь перейти &mut selfк закрытию, что вызывает гораздо больше проблем, сводящихся к нескольким изменяемым ссылкам.
    • Это можно решить, например, с помощью Mutex<A>. Однако, если вы позвоните, stop()это может привести к тупиковой ситуации.

Однако, поскольку это был фиктивный код, и вы уточнили его в комментариях. Позвольте мне попытаться пояснить, что вы имели в виду, на нескольких примерах. Это включает в себя переписывание вашего фиктивного кода.

Результат после того, как рабочий закончен

Если вам не нужен доступ к данным во время работы рабочего потока, вы можете создать новый struct WorkerData. Затем run()вы копируете / клонируете нужные вам данные A(или как я их переименовал Worker). Затем в закрытии вы, наконец, dataснова возвращаетесь , чтобы вы могли получить это через join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Вам на самом деле не нужно threadбыть, Option<JoinHandle<WorkerData>>и вместо этого вы можете просто использовать JoinHandle<WorkerData>>. Потому что, если бы вы захотели вызвать run()снова, было бы проще переназначить переменную, содержащую Worker.

Итак, теперь мы можем упростить Worker, удалив Optionи изменить, stopчтобы потреблять thread, а также создать new() -> Selfвместо run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Общий WorkerData

Если вы хотите сохранить ссылки WorkerDataмежду несколькими потоками, вам нужно будет использовать Arc. Поскольку вы дополнительно хотите иметь возможность его видоизменять, вам необходимо использовать файл Mutex.

Если вы будете изменять только в одном потоке, то в качестве альтернативы вы можете использовать a RwLock, который по сравнению с a Mutexпозволит вам блокировать и получать несколько неизменяемых ссылок одновременно.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Если вы добавите метод, который можно будет получить dataв виде Arc<RwLock<WorkerData>>. Затем, если вы клонируете Arcи заблокируете его (внутренний RwLock) перед вызовом stop(), это приведет к тупиковой ситуации. Чтобы этого избежать, любой data()метод должен возвращать &WorkerDataили &mut WorkerDataвместо Arc. Таким образом, вы не сможете позвонить stop()и вызвать тупик.

Отметить, чтобы остановить работника

Если вы действительно хотите остановить рабочий поток, вам нужно будет использовать флаг, чтобы сообщить ему об этом. Вы можете создать флаг в виде расшаренного AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Несколько потоков и несколько задач

Если вы хотите, чтобы несколько типов задач обрабатывались и распределялись по нескольким потокам, то вот более общий пример.

Вы уже упоминали об использовании mpsc. Таким образом, вы можете использовать Senderи Receiverвместе с custom Taskи TaskResultenum.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Пример использования Workerвместе с отправкой задач и получением результатов задач будет выглядеть следующим образом:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

В некоторых случаях вам может потребоваться реализовать Sendfor Taskи / или TaskResult. Ознакомьтесь с разделом «Понимание особенности отправки» .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

Параметр типа JoinHandleдолжен быть типом, возвращаемым функцией потока.

В этом случае возвращаемый тип - пустой кортеж (), произносится как unit . Он используется, когда возможно только одно значение, и является неявным «типом возврата» функций, когда тип возврата не указан.

Вы можете просто написать, JoinHandle<()>чтобы представить, что функция ничего не вернет.

(Примечание: ваш код столкнется с некоторыми проблемами с проверкой заимствований self.call(), которые, вероятно, придется решить Arc<Mutex<Self>>, но это другой вопрос.)