Jak rozpocząć i zatrzymać wątek roboczy

Jan 03 2021

Mam następujące wymaganie, które jest standardowe w innych językach programowania, ale nie wiem, jak to zrobić w Rust.

Mam klasę, chcę napisać metodę odradzania wątku roboczego, który spełniał 2 warunki:

  • Po spawnowaniu wątku roboczego funkcja jest zwracana (więc inne miejsce nie musi czekać)
  • Istnieje mechanizm zatrzymujący ten wątek.

Na przykład, oto mój fikcyjny kod:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Mam błąd pod adresem thread: JoinHandle<?>. Jaki jest rodzaj nici w tym przypadku. I czy mój kod jest poprawny, aby uruchamiać i zatrzymywać wątek roboczy?

Odpowiedzi

3 vallentin Jan 03 2021 at 02:07

Krótko mówiąc, funkcja Tin join()na JoinHandlezwraca wynik zamknięcia przekazanego do thread::spawn(). Więc w twoim przypadku JoinHandle<?>musiałoby być, JoinHandle<()>ponieważ twoje zamknięcie nic nie zwraca , tj. ()(Jednostka) .

Poza tym twój fałszywy kod zawiera kilka dodatkowych problemów.

  • Zwracany typ run()jest niepoprawny i przynajmniej powinien być Result<(), ()>.
  • threadPola musiałaby być Option<JoinHandle<()>, aby móc obsłużyć fn stop(&mut self) jak join()konsumuje JoinHandle.
  • Jednak próbujesz przejść &mut selfdo zamknięcia, które powoduje znacznie więcej problemów, sprowadzając się do wielu zmiennych odwołań
    • Można to rozwiązać za pomocą np Mutex<A>. Jeśli jednak zadzwonisz stop(), może to zamiast tego doprowadzić do impasu.

Jednak ponieważ był to fałszywy kod, co wyjaśniłeś w komentarzach. Spróbuję wyjaśnić, co miałeś na myśli, na kilku przykładach. Obejmuje to mnie przepisanie twojego fikcyjnego kodu.

Wynik po zakończeniu pracy pracownika

Jeśli nie potrzebujesz dostępu do danych, gdy wątek roboczy jest uruchomiony, możesz utworzyć nowy struct WorkerData. Następnie run()skopiuj / sklonuj dane, z których potrzebujesz A(lub tak jak je zmieniłem Worker). Następnie w zamknięciu w końcu wracasz data, abyś mógł go zdobyć join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Naprawdę nie musisz threadbyć Option<JoinHandle<WorkerData>>i zamiast tego możesz po prostu użyć JoinHandle<WorkerData>>. Ponieważ gdybyś chciał zadzwonić run()ponownie, po prostu łatwiej byłoby ponownie przypisać zmienną zawierającą Worker.

Więc teraz możemy uprościć Worker, usuwając Optioni zmieniając zamiast tego, stopaby konsumować thread, wraz z tworzeniem new() -> Selfzamiast run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Udostępnione WorkerData

Jeśli chcesz zachować odwołania do WorkerDatawielu wątków, musisz użyć Arc. Ponieważ dodatkowo chcesz mieć możliwość jego mutacji, musisz użyć pliku Mutex.

Jeśli będziesz mutować tylko w jednym wątku, możesz alternatywnie wykonać a RwLock, co w porównaniu z a Mutexpozwoli ci zablokować i uzyskać wiele niezmiennych referencji w tym samym czasie.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Jeśli dodasz metodę, aby móc uzyskać dataw postaci Arc<RwLock<WorkerData>>. Następnie, jeśli sklonujesz Arci zablokujesz go (wewnętrzną RwLock) przed wywołaniem stop(), spowoduje to zakleszczenie. Aby tego uniknąć, każda data()metoda powinna zwrócić &WorkerDatalub &mut WorkerDatazamiast Arc. W ten sposób nie będziesz w stanie zadzwonić stop()i spowodować impasu.

Zgłoś zatrzymanie pracownika

Jeśli faktycznie chcesz zatrzymać wątek roboczy, musisz użyć flagi, aby to zrobić. Możesz stworzyć flagę w formie udostępnionej AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Wiele wątków i wiele zadań

Jeśli chcesz, aby przetwarzano wiele rodzajów zadań, podzielonych na wiele wątków, oto bardziej uogólniony przykład.

Wspomniałeś już o używaniu mpsc. Możesz więc używać Sendera Receiverwraz z niestandardowym Taski TaskResultwyliczeniem.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Przykład użycia Workerwraz z wysyłaniem zadań i otrzymywaniem wyników zadań wyglądałby wtedy następująco:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

W kilku przypadkach może być konieczne wdrożenie Senddla Taski / lub TaskResult. Zobacz „Zrozumienie cechy wysyłania” .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

Parametr typu a JoinHandlepowinien być typem zwracanym przez funkcję wątku.

W tym przypadku zwracanym typem jest pusta krotka (), jednostka wymawiana . Jest używany, gdy możliwa jest tylko jedna wartość, i jest niejawnym „typem zwracania” funkcji, gdy nie określono żadnego typu zwracanego.

Możesz po prostu napisać, JoinHandle<()>aby zaznaczyć, że funkcja nic nie zwróci.

(Uwaga: w Twoim kodzie będą występować pewne problemy z narzędziem do sprawdzania wypożyczeń self.call(), które prawdopodobnie będą wymagały rozwiązania za pomocą Arc<Mutex<Self>>, ale to już inna kwestia).