So starten und stoppen Sie einen Worker-Thread

Jan 03 2021

Ich habe eine folgende Anforderung, die in anderen Programmiersprachen Standard ist, aber ich weiß nicht, wie ich in Rust vorgehen soll.

Ich habe eine Klasse und möchte eine Methode schreiben, um einen Arbeitsthread zu erzeugen, der zwei Bedingungen erfüllt:

  • Nach dem Laichen des Worker-Threads wird die Funktion zurückgegeben (sodass ein anderer Ort nicht warten muss).
  • Es gibt einen Mechanismus zum Stoppen dieses Threads.

Hier ist zum Beispiel mein Dummy-Code:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Ich habe einen Fehler bei thread: JoinHandle<?>. Was ist die Art des Threads in diesem Fall. Und ist mein Code korrekt, um einen Arbeitsthread zu starten und zu stoppen?

Antworten

3 vallentin Jan 03 2021 at 02:07

Kurz gesagt, das TIn- join()On JoinHandlegibt das Ergebnis des Abschlusses zurück, an das übergeben wurde thread::spawn(). In Ihrem Fall JoinHandle<?>müsste es also sein, JoinHandle<()>da Ihr Verschluss nichts zurückgibt , dh ()(Einheit) .

Abgesehen davon enthält Ihr Dummy-Code einige zusätzliche Probleme.

  • Der Rückgabetyp von run()ist falsch und müsste es zumindest sein Result<(), ()>.
  • Das threadFeld müsste Option<JoinHandle<()>in der Lage sein, mit fn stop(&mut self) dem join()Verbrauch umzugehen JoinHandle.
  • Sie versuchen jedoch, &mut selfzum Abschluss überzugehen, was viel mehr Probleme mit sich bringt und sich auf mehrere veränderbare Referenzen beschränkt
    • Dies könnte zB gelöst werden Mutex<A>. Wenn Sie jedoch anrufen stop(), kann dies stattdessen zu einem Deadlock führen.

Da es sich jedoch um Dummy-Code handelte, haben Sie dies in den Kommentaren klargestellt. Lassen Sie mich versuchen, anhand einiger Beispiele zu verdeutlichen, was Sie gemeint haben. Dazu gehört, dass ich Ihren Dummy-Code neu schreibe.

Ergebnis, nachdem der Arbeiter fertig ist

Wenn Sie keinen Zugriff auf die Daten benötigen, während der Arbeitsthread ausgeführt wird, können Sie einen neuen erstellen struct WorkerData. Dann run()kopieren / klonen Sie die Daten, von denen Sie sie benötigen A(oder wie ich sie umbenannt habe Worker). Dann kehren Sie im Verschluss endlich wieder datazurück, damit Sie es durch erwerben können join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Sie müssen nicht wirklich threadsein Option<JoinHandle<WorkerData>>und könnten stattdessen nur verwenden JoinHandle<WorkerData>>. Denn wenn Sie run()erneut aufrufen möchten, ist es einfacher, die Variable, die das enthält, neu zuzuweisen Worker.

So , jetzt können wir vereinfachen Worker, das Entfernen Optionund verändern stopzu konsumieren threadstatt, zusammen mit der Erstellung new() -> Selfstatt run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Geteilt WorkerData

Wenn Sie Verweise auf WorkerDatamehrere Threads beibehalten möchten, müssen Sie diese verwenden Arc. Da Sie es zusätzlich mutieren möchten, müssen Sie a verwenden Mutex.

Wenn Sie nur innerhalb eines einzelnen Threads mutieren, können Sie alternativ auch a verwenden RwLock, wodurch Sie im Vergleich zu a Mutexmehrere unveränderliche Referenzen gleichzeitig sperren und abrufen können.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Wenn Sie eine Methode hinzufügen, um datain Form von erhalten zu können Arc<RwLock<WorkerData>>. Wenn Sie dann das klonen Arcund es (das Innere RwLock) vor dem Aufruf sperren stop(), würde dies zu einem Deadlock führen. Um dies zu vermeiden, sollte jede data()Methode &WorkerDataoder &mut WorkerDataanstelle von zurückgeben Arc. Auf diese Weise können Sie nicht anrufen stop()und einen Deadlock verursachen.

Flagge, um Arbeiter zu stoppen

Wenn Sie den Worker-Thread tatsächlich stoppen möchten, müssen Sie ein Flag verwenden, um dies zu signalisieren. Sie können ein Flag in Form einer Freigabe erstellen AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Mehrere Threads und mehrere Aufgaben

Wenn Sie mehrere Arten von Aufgaben verarbeiten möchten, die auf mehrere Threads verteilt sind, finden Sie hier ein allgemeineres Beispiel.

Sie haben bereits die Verwendung erwähnt mpsc. Sie können also ein Senderund Receiverzusammen mit einem Brauch Taskund einer TaskResultAufzählung verwenden.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Ein Beispiel für die Verwendung von WorkerAufgaben zusammen mit dem Senden und Empfangen von Aufgabenergebnissen würde dann folgendermaßen aussehen:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

In einigen Fällen müssen Sie möglicherweise Sendfür Taskund / oder implementieren TaskResult. Lesen Sie "Grundlegendes zum Senden" .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

Der Typparameter von a JoinHandlesollte der Rückgabetyp der Thread-Funktion sein.

In diesem Fall ist der Rückgabetyp eine leere (), ausgesprochene Tupeleinheit . Es wird verwendet, wenn nur ein Wert möglich ist, und ist der implizite "Rückgabetyp" von Funktionen, wenn kein Rückgabetyp angegeben ist.

Sie können einfach schreiben, JoinHandle<()>um darzustellen, dass die Funktion nichts zurückgibt.

(Hinweis: Bei Ihrem Code self.call()treten einige Probleme mit dem Leihprüfer auf, mit denen wahrscheinlich Probleme gelöst werden müssen Arc<Mutex<Self>>, aber das ist eine andere Frage.)