वर्कर थ्रेड को कैसे शुरू करें और बंद करें

Jan 03 2021

मुझे निम्नलिखित आवश्यकता है जो अन्य प्रोग्रामिंग भाषाओं में मानक है, लेकिन मुझे पता नहीं है कि जंग में कैसे करना है।

मेरे पास एक वर्ग है, मैं एक कार्यकर्ता सूत्र को स्पॉन करने के लिए एक विधि लिखना चाहता हूं जो 2 स्थितियों को संतुष्ट करता है:

  • श्रमिक धागे को दबाने के बाद, फ़ंक्शन वापस आ जाता है (इसलिए अन्य स्थान पर प्रतीक्षा करने की आवश्यकता नहीं है)
  • इस धागे को रोकने के लिए एक तंत्र है।

उदाहरण के लिए, यहां मेरा डमी कोड है:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

मैं कम से एक त्रुटि है thread: JoinHandle<?>। इस मामले में धागे का प्रकार क्या है। और क्या एक श्रमिक सूत्र को शुरू करने और रोकने के लिए मेरा कोड सही है?

जवाब

3 vallentin Jan 03 2021 at 02:07

संक्षेप में, Tमें join()एक पर JoinHandleरिटर्न बंद का परिणाम के लिए पारित किया thread::spawn()। इसलिए आपके मामले में आपके क्लोजर रिटर्न यानी कुछ नहीं (यूनिट) के रूप में JoinHandle<?>होना चाहिए ।JoinHandle<()>()

इसके अलावा, आपके डमी कोड में कुछ अतिरिक्त मुद्दे हैं।

  • वापसी प्रकार run()गलत है, और कम से कम होना चाहिए Result<(), ()>
  • threadक्षेत्र की आवश्यकता होगी Option<JoinHandle<()>करने के लिए सक्षम होने के लिए संभाल fn stop(&mut self) के रूप में join()खपत JoinHandle
  • हालांकि, आप &mut selfक्लोजर को पास करने का प्रयास कर रहे हैं , जो कई और मुद्दों को लाता है, जो कई परस्पर संदर्भों के लिए उबलते हैं
    • इसे उदाहरण के साथ हल किया जा सकता है Mutex<A>। हालांकि, अगर आप फोन करते हैं stop()तो इसके बजाय गतिरोध पैदा हो सकता है।

हालाँकि, चूंकि यह डमी कोड था, और आपने टिप्पणियों में स्पष्ट किया। कुछ उदाहरणों के साथ आपका क्या मतलब है मुझे स्पष्ट करने का प्रयास करें। इसमें मुझे आपका डमी कोड लिखना है।

कार्यकर्ता के बाद परिणाम

यदि श्रमिक थ्रेड चलाते समय आपको डेटा तक पहुंच की आवश्यकता नहीं है, तो आप एक नया बना सकते हैं struct WorkerData। फिर run()आप उस डेटा को कॉपी / क्लोन करते हैं जिसकी आपको आवश्यकता है A(या जैसा कि मैंने इसका नाम बदला है Worker)। फिर बंद होने के बाद आप अंत में dataफिर से लौट आते हैं , इसलिए आप इसे हासिल कर सकते हैं join()

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

तुम सच में होने की जरूरत नहीं threadहै Option<JoinHandle<WorkerData>>और इसके बजाय सिर्फ उपयोग कर सकते हैं JoinHandle<WorkerData>>। क्योंकि यदि आप run()फिर से कॉल करना चाहते हैं , तो यह आसान होगा कि पकड़े हुए वेरिएबल को फिर से असाइन करें Worker

तो अब हम को आसान बनाने में कर सकते हैं Worker, को दूर करने Optionऔर बदलने stopका उपभोग करने के threadबजाय, बनाने के साथ साथ new() -> Selfके स्थान पर run(&mut self)

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

साझा WorkerData

यदि आप WorkerDataकई थ्रेड्स के बीच संदर्भों को बनाए रखना चाहते हैं , तो आपको उपयोग करने की आवश्यकता होगी Arc। चूंकि आप इसके अलावा इसे बदलना चाहते हैं, इसलिए आपको इसका उपयोग करने की आवश्यकता होगी Mutex।

यदि आप केवल एक ही धागे के भीतर परिवर्तन कर रहे हैं, तो आप वैकल्पिक रूप से एक कर सकते हैं RwLock, जो एक Mutexइच्छा की तुलना में आपको एक ही समय में कई अपरिवर्तनीय संदर्भों को लॉक करने और प्राप्त करने की अनुमति देगा।

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

यदि आप dataके रूप में प्राप्त करने में सक्षम होने के लिए एक विधि जोड़ते हैं Arc<RwLock<WorkerData>>। तब यदि आप क्लोन करते हैं Arcऔर RwLockकॉल करने से पहले (भीतर ) लॉक करते हैं stop(), तो इसके परिणामस्वरूप गतिरोध होगा। उससे बचने के लिए, किसी भी data()विधि को लौटना चाहिए &WorkerDataया &mut WorkerDataइसके बजाय Arc। इस तरह आप कॉल नहीं कर पाएंगे stop()और गतिरोध पैदा कर सकते हैं।

कार्यकर्ता को रोकने के लिए झंडा

यदि आप वास्तव में कार्यकर्ता थ्रेड को रोकना चाहते हैं, तो आपको ऐसा करने के लिए संकेत करने के लिए एक ध्वज का उपयोग करना होगा। आप साझा के रूप में एक ध्वज बना सकते हैं AtomicBool।

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

कई सूत्र और कई कार्य

यदि आप कई प्रकार के कार्यों को संसाधित करना चाहते हैं, तो कई थ्रेड्स में फैले हुए हैं, तो यहां एक अधिक सामान्यीकृत उदाहरण है।

आपने पहले ही उपयोग का उल्लेख किया है mpsc। तो तुम एक का उपयोग कर सकते Senderहैं और Receiverएक कस्टम के साथ-साथ Taskऔर TaskResultenum।

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Workerकार्यों को भेजने और कार्य परिणाम प्राप्त करने के साथ उपयोग करने का एक उदाहरण , फिर इस तरह दिखेगा:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

कुछ मामलों में आप को लागू करने की आवश्यकता हो सकती है Sendके लिए Taskऔर / या TaskResultकी जाँच करें "संदेश विशेषता को समझना" ।

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

प्रकार का पैरामीटर JoinHandleथ्रेड फ़ंक्शन के रिटर्न प्रकार होना चाहिए।

इस मामले में, वापसी प्रकार एक खाली टपल (), उच्चारित इकाई है । इसका उपयोग तब किया जाता है जब केवल एक ही मूल्य संभव होता है, और कोई रिटर्न प्रकार निर्दिष्ट नहीं होने पर कार्यों का निहितार्थ "वापसी प्रकार" होता है।

आप केवल JoinHandle<()>यह दर्शाने के लिए लिख सकते हैं कि फ़ंक्शन कुछ भी वापस नहीं करेगा।

(नोट: आपका कोड कुछ उधार लेने वाले चेकर मुद्दों में चलेगा self.call(), जिसके साथ शायद हल करने की आवश्यकता होगी Arc<Mutex<Self>>, लेकिन यह एक और सवाल है।)