वर्कर थ्रेड को कैसे शुरू करें और बंद करें
मुझे निम्नलिखित आवश्यकता है जो अन्य प्रोग्रामिंग भाषाओं में मानक है, लेकिन मुझे पता नहीं है कि जंग में कैसे करना है।
मेरे पास एक वर्ग है, मैं एक कार्यकर्ता सूत्र को स्पॉन करने के लिए एक विधि लिखना चाहता हूं जो 2 स्थितियों को संतुष्ट करता है:
- श्रमिक धागे को दबाने के बाद, फ़ंक्शन वापस आ जाता है (इसलिए अन्य स्थान पर प्रतीक्षा करने की आवश्यकता नहीं है)
- इस धागे को रोकने के लिए एक तंत्र है।
उदाहरण के लिए, यहां मेरा डमी कोड है:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
मैं कम से एक त्रुटि है thread: JoinHandle<?>
। इस मामले में धागे का प्रकार क्या है। और क्या एक श्रमिक सूत्र को शुरू करने और रोकने के लिए मेरा कोड सही है?
जवाब
संक्षेप में, T
में join()एक पर JoinHandleJoinHandle<?>
होना चाहिए ।JoinHandle<()>
()
इसके अलावा, आपके डमी कोड में कुछ अतिरिक्त मुद्दे हैं।
- वापसी प्रकार
run()
गलत है, और कम से कम होना चाहिएResult<(), ()>
। thread
क्षेत्र की आवश्यकता होगीOption<JoinHandle<()>
करने के लिए सक्षम होने के लिए संभालfn stop(&mut self)
के रूप में join()खपतJoinHandle
।- हालांकि, आप
&mut self
क्लोजर को पास करने का प्रयास कर रहे हैं , जो कई और मुद्दों को लाता है, जो कई परस्पर संदर्भों के लिए उबलते हैं- इसे उदाहरण के साथ हल किया जा सकता है
Mutex<A>
। हालांकि, अगर आप फोन करते हैंstop()
तो इसके बजाय गतिरोध पैदा हो सकता है।
- इसे उदाहरण के साथ हल किया जा सकता है
हालाँकि, चूंकि यह डमी कोड था, और आपने टिप्पणियों में स्पष्ट किया। कुछ उदाहरणों के साथ आपका क्या मतलब है मुझे स्पष्ट करने का प्रयास करें। इसमें मुझे आपका डमी कोड लिखना है।
कार्यकर्ता के बाद परिणाम
यदि श्रमिक थ्रेड चलाते समय आपको डेटा तक पहुंच की आवश्यकता नहीं है, तो आप एक नया बना सकते हैं struct WorkerData
। फिर run()
आप उस डेटा को कॉपी / क्लोन करते हैं जिसकी आपको आवश्यकता है A
(या जैसा कि मैंने इसका नाम बदला है Worker
)। फिर बंद होने के बाद आप अंत में data
फिर से लौट आते हैं , इसलिए आप इसे हासिल कर सकते हैं join()
।
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
तुम सच में होने की जरूरत नहीं thread
है Option<JoinHandle<WorkerData>>
और इसके बजाय सिर्फ उपयोग कर सकते हैं JoinHandle<WorkerData>>
। क्योंकि यदि आप run()
फिर से कॉल करना चाहते हैं , तो यह आसान होगा कि पकड़े हुए वेरिएबल को फिर से असाइन करें Worker
।
तो अब हम को आसान बनाने में कर सकते हैं Worker
, को दूर करने Option
और बदलने stop
का उपभोग करने के thread
बजाय, बनाने के साथ साथ new() -> Self
के स्थान पर run(&mut self)
।
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
साझा WorkerData
यदि आप WorkerData
कई थ्रेड्स के बीच संदर्भों को बनाए रखना चाहते हैं , तो आपको उपयोग करने की आवश्यकता होगी Arc। चूंकि आप इसके अलावा इसे बदलना चाहते हैं, इसलिए आपको इसका उपयोग करने की आवश्यकता होगी Mutex।
यदि आप केवल एक ही धागे के भीतर परिवर्तन कर रहे हैं, तो आप वैकल्पिक रूप से एक कर सकते हैं RwLock, जो एक Mutex
इच्छा की तुलना में आपको एक ही समय में कई अपरिवर्तनीय संदर्भों को लॉक करने और प्राप्त करने की अनुमति देगा।
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
यदि आप data
के रूप में प्राप्त करने में सक्षम होने के लिए एक विधि जोड़ते हैं Arc<RwLock<WorkerData>>
। तब यदि आप क्लोन करते हैं Arc
और RwLock
कॉल करने से पहले (भीतर ) लॉक करते हैं stop()
, तो इसके परिणामस्वरूप गतिरोध होगा। उससे बचने के लिए, किसी भी data()
विधि को लौटना चाहिए &WorkerData
या &mut WorkerData
इसके बजाय Arc
। इस तरह आप कॉल नहीं कर पाएंगे stop()
और गतिरोध पैदा कर सकते हैं।
कार्यकर्ता को रोकने के लिए झंडा
यदि आप वास्तव में कार्यकर्ता थ्रेड को रोकना चाहते हैं, तो आपको ऐसा करने के लिए संकेत करने के लिए एक ध्वज का उपयोग करना होगा। आप साझा के रूप में एक ध्वज बना सकते हैं AtomicBool।
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
कई सूत्र और कई कार्य
यदि आप कई प्रकार के कार्यों को संसाधित करना चाहते हैं, तो कई थ्रेड्स में फैले हुए हैं, तो यहां एक अधिक सामान्यीकृत उदाहरण है।
आपने पहले ही उपयोग का उल्लेख किया है mpsc। तो तुम एक का उपयोग कर सकते Senderहैं और Receiverएक कस्टम के साथ-साथ Task
और TaskResult
enum।
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Worker
कार्यों को भेजने और कार्य परिणाम प्राप्त करने के साथ उपयोग करने का एक उदाहरण , फिर इस तरह दिखेगा:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
कुछ मामलों में आप को लागू करने की आवश्यकता हो सकती है Send
के लिए Task
और / या TaskResult
। की जाँच करें "संदेश विशेषता को समझना" ।
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
प्रकार का पैरामीटर JoinHandle
थ्रेड फ़ंक्शन के रिटर्न प्रकार होना चाहिए।
इस मामले में, वापसी प्रकार एक खाली टपल ()
, उच्चारित इकाई है । इसका उपयोग तब किया जाता है जब केवल एक ही मूल्य संभव होता है, और कोई रिटर्न प्रकार निर्दिष्ट नहीं होने पर कार्यों का निहितार्थ "वापसी प्रकार" होता है।
आप केवल JoinHandle<()>
यह दर्शाने के लिए लिख सकते हैं कि फ़ंक्शन कुछ भी वापस नहीं करेगा।
(नोट: आपका कोड कुछ उधार लेने वाले चेकर मुद्दों में चलेगा self.call()
, जिसके साथ शायद हल करने की आवश्यकता होगी Arc<Mutex<Self>>
, लेकिन यह एक और सवाल है।)