Cara memulai dan menghentikan utas pekerja

Jan 03 2021

Saya memiliki persyaratan berikut yang merupakan standar dalam bahasa pemrograman lain tetapi saya tidak tahu bagaimana melakukannya di Rust.

Saya memiliki kelas, saya ingin menulis metode untuk menelurkan utas pekerja yang memenuhi 2 kondisi:

  • Setelah memunculkan thread pekerja, fungsinya adalah kembali (jadi tempat lain tidak perlu menunggu)
  • Ada mekanisme untuk menghentikan utas ini.

Misalnya, berikut adalah kode dummy saya:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Saya mengalami kesalahan di thread: JoinHandle<?>. Apa jenis utas dalam kasus ini. Dan apakah kode saya benar untuk memulai dan menghentikan thread pekerja?

Jawaban

3 vallentin Jan 03 2021 at 02:07

Singkatnya, Tin join()on a JoinHandlemengembalikan hasil closure yang diteruskan thread::spawn(). Jadi dalam kasus Anda JoinHandle<?>akan perlu JoinHandle<()>karena closure Anda tidak menghasilkan apa-apa , yaitu ()(unit) .

Selain itu, kode dummy Anda berisi beberapa masalah tambahan.

  • Jenis kembalian run()salah, dan setidaknya harus Result<(), ()>.
  • The threadlapangan akan perlu Option<JoinHandle<()>untuk dapat menangani fn stop(&mut self) sebagai join()mengkonsumsi yang JoinHandle.
  • Namun, Anda mencoba untuk lolos &mut selfke closure, yang membawa lebih banyak masalah, bermuara pada beberapa referensi yang bisa berubah
    • Ini bisa diselesaikan dengan mis Mutex<A>. Namun, jika Anda menelepon stop()maka itu bisa menyebabkan kebuntuan.

Namun, karena itu adalah kode tiruan, dan Anda mengklarifikasi di komentar. Izinkan saya mencoba menjelaskan apa yang Anda maksud dengan beberapa contoh. Ini termasuk saya menulis ulang kode dummy Anda.

Hasil setelah pekerja selesai

Jika Anda tidak memerlukan akses ke data saat thread pekerja sedang berjalan, Anda dapat membuat yang baru struct WorkerData. Kemudian run()Anda menyalin / mengkloning data yang Anda butuhkan A(atau seperti yang telah saya ubah namanya Worker). Kemudian di penutupan Anda akhirnya kembali datalagi, sehingga Anda dapat memperolehnya melalui join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Anda tidak benar-benar perlu threadmenjadi Option<JoinHandle<WorkerData>>dan sebagai gantinya bisa langsung menggunakan JoinHandle<WorkerData>>. Karena jika Anda ingin memanggil run()lagi, akan lebih mudah untuk menetapkan kembali variabel yang menahan Worker.

Jadi sekarang kita bisa menyederhanakan Worker, menghapus Optiondan mengubah stopuntuk dikonsumsi threadsebagai gantinya, bersama dengan membuat new() -> Selfdi tempat run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Bersama WorkerData

Jika Anda ingin mempertahankan referensi di WorkerDataantara beberapa utas, Anda harus menggunakan Arc. Karena Anda juga ingin dapat memutasinya, Anda harus menggunakan file Mutex.

Jika Anda hanya akan bermutasi dalam satu utas, maka Anda dapat memilih a RwLock, yang dibandingkan dengan a Mutexakan memungkinkan Anda untuk mengunci dan mendapatkan beberapa referensi yang tidak dapat diubah pada saat yang bersamaan.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Jika Anda menambahkan metode untuk mendapatkan datadalam bentuk Arc<RwLock<WorkerData>>. Kemudian jika Anda mengkloning Arcdan menguncinya (bagian dalam RwLock) sebelum menelepon stop(), maka itu akan mengakibatkan kebuntuan. Untuk menghindarinya, data()metode apa pun harus mengembalikan &WorkerDataatau &mut WorkerDatasebagai pengganti Arc. Dengan cara itu Anda tidak dapat menelepon stop()dan menyebabkan kebuntuan.

Tandai untuk menghentikan pekerja

Jika Anda benar-benar ingin menghentikan utas pekerja, Anda harus menggunakan bendera untuk memberi isyarat agar melakukannya. Anda dapat membuat bendera dalam bentuk bersama AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Banyak utas dan banyak tugas

Jika Anda ingin beberapa jenis tugas diproses, tersebar di beberapa utas, berikut adalah contoh yang lebih umum.

Anda sudah menyebutkan menggunakan mpsc. Jadi Anda bisa menggunakan a Senderdan Receiverbersama dengan custom Taskdan TaskResultenum.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Contoh penggunaan Workerbersama dengan mengirim tugas dan menerima hasil tugas, akan terlihat seperti ini:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

Dalam beberapa kasus, Anda mungkin perlu menerapkan Senduntuk Taskdan / atau TaskResult. Lihat "Memahami sifat Kirim" .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

Parameter tipe a JoinHandleharuslah tipe kembalian dari fungsi utas.

Dalam hal ini, tipe yang dikembalikan adalah tupel kosong (), unit diucapkan . Ini digunakan ketika hanya ada satu nilai yang mungkin, dan merupakan "tipe pengembalian" fungsi yang implisit ketika tidak ada tipe pengembalian yang ditentukan.

Anda bisa menulis JoinHandle<()>untuk menyatakan bahwa fungsi tersebut tidak akan mengembalikan apa pun.

(Catatan: Kode Anda akan mengalami beberapa masalah peminjam pemeriksa self.call(), yang mungkin perlu diselesaikan dengan Arc<Mutex<Self>>, tapi itu pertanyaan lain.)