วิธีเริ่มและหยุดเธรดผู้ปฏิบัติงาน
ฉันมีข้อกำหนดต่อไปนี้ซึ่งเป็นมาตรฐานในภาษาโปรแกรมอื่น ๆ แต่ฉันไม่รู้ว่าจะทำอย่างไรใน Rust
ฉันมีคลาสฉันต้องการเขียนวิธีการสร้างเธรดผู้ปฏิบัติงานที่ตรงตามเงื่อนไข 2 ประการ:
- หลังจากวางไข่เธรดผู้ปฏิบัติงานฟังก์ชันจะถูกส่งกลับ (ดังนั้นที่อื่นไม่จำเป็นต้องรอ)
- มีกลไกในการหยุดเธรดนี้
ตัวอย่างเช่นนี่คือรหัสจำลองของฉัน:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
thread: JoinHandle<?>
ผมมีข้อผิดพลาดที่ ประเภทของเธรดในกรณีนี้คืออะไร และรหัสของฉันถูกต้องในการเริ่มและหยุดเธรดผู้ปฏิบัติงานหรือไม่
คำตอบ
ในระยะสั้นT
ในjoin()ในผลตอบแทนที่ผลของการปิดผ่านไปJoinHandleJoinHandle<?>
JoinHandle<()>
()
นอกเหนือจากนั้นโค้ดจำลองของคุณยังมีปัญหาเพิ่มเติมอีกเล็กน้อย
- ประเภทการกลับมาของไม่ถูกต้องและจะต้องไปอย่างน้อยที่สุด
run()
Result<(), ()>
thread
ฟิลด์จะต้องมีOption<JoinHandle<()>
เพื่อให้สามารถจัดการfn stop(&mut self)
เป็นกินjoin()JoinHandle
- อย่างไรก็ตามคุณกำลังพยายามที่จะส่ง
&mut self
ต่อไปยังการปิดบัญชีซึ่งทำให้เกิดปัญหามากขึ้นและมีการอ้างอิงที่ไม่แน่นอนหลายรายการMutex<A>
นี้สามารถแก้ไขได้ด้วยเช่น อย่างไรก็ตามหากคุณโทรstop()
ไปนั่นอาจนำไปสู่การชะงักงันแทน
อย่างไรก็ตามเนื่องจากเป็นรหัสจำลองและคุณได้ชี้แจงในความคิดเห็น ให้ฉันลองอธิบายความหมายของคุณด้วยตัวอย่างบางส่วน ซึ่งรวมถึงฉันเขียนโค้ดจำลองของคุณใหม่ด้วย
ผลลัพธ์หลังจากคนงานทำเสร็จ
หากคุณไม่ต้องการเข้าถึงข้อมูลในขณะที่เธรดผู้ปฏิบัติงานกำลังทำงานอยู่คุณสามารถสร้างไฟล์struct WorkerData
. จากนั้นให้run()
คุณคัดลอก / โคลนข้อมูลที่คุณต้องการA
(หรือตามที่ฉันเปลี่ยนชื่อWorker
) จากนั้นในการปิดคุณก็กลับมาอีกครั้งเพื่อให้คุณสามารถได้รับมันผ่านdata
join()
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
คุณไม่ได้จริงๆต้องthread
ที่จะเป็นและแทนที่จะทำได้เพียงแค่การใช้งานOption<JoinHandle<WorkerData>>
JoinHandle<WorkerData>>
เพราะถ้าคุณต้องการเรียกrun()
อีกครั้งการกำหนดตัวแปรที่ถือไฟล์Worker
.
ดังนั้นตอนนี้เราสามารถลดความซับซ้อนWorker
ลบOption
และการเปลี่ยนแปลงstop
ที่จะบริโภคthread
แทนพร้อมกับการสร้างในสถานที่ของnew() -> Self
run(&mut self)
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
แชร์ WorkerData
หากคุณต้องการที่จะรักษาอ้างอิงถึงระหว่างหลายหัวข้อแล้วคุณจะต้องใช้WorkerData
Arcเนื่องจากคุณต้องการที่จะกลายพันธุ์เพิ่มเติมคุณจึงต้องใช้ไฟล์Mutex.
หากคุณจะกลายพันธุ์ภายในเธรดเดียวคุณสามารถเปลี่ยนเป็น a RwLockซึ่งเมื่อเทียบกับ a Mutex
จะช่วยให้คุณสามารถล็อกและรับข้อมูลอ้างอิงที่ไม่เปลี่ยนรูปได้หลายรายการในเวลาเดียวกัน
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
หากคุณเพิ่มวิธีการที่จะได้รับdata
ในรูปแบบของArc<RwLock<WorkerData>>
. จากนั้นถ้าคุณโคลนArc
และล็อค (ด้านในRwLock
) ก่อนที่จะโทรstop()
ก็จะส่งผลให้เกิดการชะงักงัน เพื่อหลีกเลี่ยงสิ่งนั้นdata()
วิธีใด ๆควรส่งคืน&WorkerData
หรือ&mut WorkerData
แทนที่จะเป็นArc
. ด้วยวิธีนี้คุณจะไม่สามารถโทรหาstop()
และทำให้เกิดการชะงักงันได้
ตั้งค่าสถานะให้หยุดคนงาน
หากคุณต้องการหยุดเธรดผู้ปฏิบัติงานจริงคุณจะต้องใช้แฟล็กเพื่อส่งสัญญาณให้ทำเช่นนั้น AtomicBoolคุณสามารถสร้างธงในรูปแบบของที่ใช้ร่วมกัน
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
เธรดหลายเธรดและหลายงาน
หากคุณต้องการให้มีการประมวลผลงานหลายประเภทให้กระจายไปตามเธรดหลายเธรดต่อไปนี้เป็นตัวอย่างทั่วไป
คุณได้กล่าวถึงแล้วโดยใช้mpsc. ดังนั้นคุณสามารถใช้ a SenderและReceiverร่วมกับกำหนดเองTask
และTaskResult
enum
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
ตัวอย่างของการใช้Worker
พร้อมกับการส่งงานและรับผลลัพธ์ของงานจะมีลักษณะดังนี้:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
ในบางกรณีคุณอาจจำเป็นต้องใช้Send
สำหรับการและTask
/ หรือ ตรวจสอบ"การทำความเข้าใจส่งลักษณะ"TaskResult
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
พารามิเตอร์ type ของ a JoinHandle
ควรเป็นประเภทการส่งคืนของฟังก์ชันของเธรด
ในกรณีนี้ชนิดกลับเป็นอันดับที่ว่างเปล่า()
, เด่นชัดหน่วย จะใช้เมื่อมีเพียงค่าเดียวที่เป็นไปได้และเป็น "ประเภทการส่งคืน" โดยปริยายของฟังก์ชันเมื่อไม่ได้ระบุประเภทการส่งคืน
คุณสามารถเขียนJoinHandle<()>
เพื่อแสดงว่าฟังก์ชันจะไม่ส่งคืนอะไรเลย
(หมายเหตุ: รหัสของคุณจะพบปัญหาตัวตรวจสอบการยืมself.call()
ซึ่งอาจต้องได้รับการแก้ไขArc<Mutex<Self>>
แต่นั่นเป็นอีกคำถามหนึ่ง)