วิธีเริ่มและหยุดเธรดผู้ปฏิบัติงาน

Jan 03 2021

ฉันมีข้อกำหนดต่อไปนี้ซึ่งเป็นมาตรฐานในภาษาโปรแกรมอื่น ๆ แต่ฉันไม่รู้ว่าจะทำอย่างไรใน Rust

ฉันมีคลาสฉันต้องการเขียนวิธีการสร้างเธรดผู้ปฏิบัติงานที่ตรงตามเงื่อนไข 2 ประการ:

  • หลังจากวางไข่เธรดผู้ปฏิบัติงานฟังก์ชันจะถูกส่งกลับ (ดังนั้นที่อื่นไม่จำเป็นต้องรอ)
  • มีกลไกในการหยุดเธรดนี้

ตัวอย่างเช่นนี่คือรหัสจำลองของฉัน:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

thread: JoinHandle<?>ผมมีข้อผิดพลาดที่ ประเภทของเธรดในกรณีนี้คืออะไร และรหัสของฉันถูกต้องในการเริ่มและหยุดเธรดผู้ปฏิบัติงานหรือไม่

คำตอบ

3 vallentin Jan 03 2021 at 02:07

ในระยะสั้นTในjoin()ในผลตอบแทนที่ผลของการปิดผ่านไปJoinHandle ดังนั้นในกรณีของคุณจะต้องเป็นผลตอบแทนที่ปิดของคุณอะไรคือ(หน่วย)thread::spawn()JoinHandle<?>JoinHandle<()>()

นอกเหนือจากนั้นโค้ดจำลองของคุณยังมีปัญหาเพิ่มเติมอีกเล็กน้อย

  • ประเภทการกลับมาของไม่ถูกต้องและจะต้องไปอย่างน้อยที่สุดrun()Result<(), ()>
  • threadฟิลด์จะต้องมีOption<JoinHandle<()>เพื่อให้สามารถจัดการ fn stop(&mut self)เป็นกินjoin()JoinHandle
  • อย่างไรก็ตามคุณกำลังพยายามที่จะส่ง&mut selfต่อไปยังการปิดบัญชีซึ่งทำให้เกิดปัญหามากขึ้นและมีการอ้างอิงที่ไม่แน่นอนหลายรายการ
    • Mutex<A>นี้สามารถแก้ไขได้ด้วยเช่น อย่างไรก็ตามหากคุณโทรstop()ไปนั่นอาจนำไปสู่การชะงักงันแทน

อย่างไรก็ตามเนื่องจากเป็นรหัสจำลองและคุณได้ชี้แจงในความคิดเห็น ให้ฉันลองอธิบายความหมายของคุณด้วยตัวอย่างบางส่วน ซึ่งรวมถึงฉันเขียนโค้ดจำลองของคุณใหม่ด้วย

ผลลัพธ์หลังจากคนงานทำเสร็จ

หากคุณไม่ต้องการเข้าถึงข้อมูลในขณะที่เธรดผู้ปฏิบัติงานกำลังทำงานอยู่คุณสามารถสร้างไฟล์struct WorkerData. จากนั้นให้run()คุณคัดลอก / โคลนข้อมูลที่คุณต้องการA(หรือตามที่ฉันเปลี่ยนชื่อWorker) จากนั้นในการปิดคุณก็กลับมาอีกครั้งเพื่อให้คุณสามารถได้รับมันผ่านdatajoin()

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

คุณไม่ได้จริงๆต้องthreadที่จะเป็นและแทนที่จะทำได้เพียงแค่การใช้งานOption<JoinHandle<WorkerData>> JoinHandle<WorkerData>>เพราะถ้าคุณต้องการเรียกrun()อีกครั้งการกำหนดตัวแปรที่ถือไฟล์Worker.

ดังนั้นตอนนี้เราสามารถลดความซับซ้อนWorkerลบOptionและการเปลี่ยนแปลงstopที่จะบริโภคthreadแทนพร้อมกับการสร้างในสถานที่ของnew() -> Selfrun(&mut self)

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

แชร์ WorkerData

หากคุณต้องการที่จะรักษาอ้างอิงถึงระหว่างหลายหัวข้อแล้วคุณจะต้องใช้WorkerData Arcเนื่องจากคุณต้องการที่จะกลายพันธุ์เพิ่มเติมคุณจึงต้องใช้ไฟล์Mutex.

หากคุณจะกลายพันธุ์ภายในเธรดเดียวคุณสามารถเปลี่ยนเป็น a RwLockซึ่งเมื่อเทียบกับ a Mutexจะช่วยให้คุณสามารถล็อกและรับข้อมูลอ้างอิงที่ไม่เปลี่ยนรูปได้หลายรายการในเวลาเดียวกัน

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

หากคุณเพิ่มวิธีการที่จะได้รับdataในรูปแบบของArc<RwLock<WorkerData>>. จากนั้นถ้าคุณโคลนArcและล็อค (ด้านในRwLock) ก่อนที่จะโทรstop()ก็จะส่งผลให้เกิดการชะงักงัน เพื่อหลีกเลี่ยงสิ่งนั้นdata()วิธีใด ๆควรส่งคืน&WorkerDataหรือ&mut WorkerDataแทนที่จะเป็นArc. ด้วยวิธีนี้คุณจะไม่สามารถโทรหาstop()และทำให้เกิดการชะงักงันได้

ตั้งค่าสถานะให้หยุดคนงาน

หากคุณต้องการหยุดเธรดผู้ปฏิบัติงานจริงคุณจะต้องใช้แฟล็กเพื่อส่งสัญญาณให้ทำเช่นนั้น AtomicBoolคุณสามารถสร้างธงในรูปแบบของที่ใช้ร่วมกัน

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

เธรดหลายเธรดและหลายงาน

หากคุณต้องการให้มีการประมวลผลงานหลายประเภทให้กระจายไปตามเธรดหลายเธรดต่อไปนี้เป็นตัวอย่างทั่วไป

คุณได้กล่าวถึงแล้วโดยใช้mpsc. ดังนั้นคุณสามารถใช้ a SenderและReceiverร่วมกับกำหนดเองTaskและTaskResultenum

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

ตัวอย่างของการใช้Workerพร้อมกับการส่งงานและรับผลลัพธ์ของงานจะมีลักษณะดังนี้:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

ในบางกรณีคุณอาจจำเป็นต้องใช้SendสำหรับการและTask / หรือ ตรวจสอบ"การทำความเข้าใจส่งลักษณะ"TaskResult

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

พารามิเตอร์ type ของ a JoinHandleควรเป็นประเภทการส่งคืนของฟังก์ชันของเธรด

ในกรณีนี้ชนิดกลับเป็นอันดับที่ว่างเปล่า(), เด่นชัดหน่วย จะใช้เมื่อมีเพียงค่าเดียวที่เป็นไปได้และเป็น "ประเภทการส่งคืน" โดยปริยายของฟังก์ชันเมื่อไม่ได้ระบุประเภทการส่งคืน

คุณสามารถเขียนJoinHandle<()>เพื่อแสดงว่าฟังก์ชันจะไม่ส่งคืนอะไรเลย

(หมายเหตุ: รหัสของคุณจะพบปัญหาตัวตรวจสอบการยืมself.call()ซึ่งอาจต้องได้รับการแก้ไขArc<Mutex<Self>>แต่นั่นเป็นอีกคำถามหนึ่ง)