Cách bắt đầu và dừng một chuỗi công nhân
Tôi có một yêu cầu sau đây là yêu cầu tiêu chuẩn trong các ngôn ngữ lập trình khác nhưng tôi không biết cách thực hiện trong Rust.
Tôi có một lớp, tôi muốn viết một phương thức để tạo ra một chuỗi công nhân thỏa mãn 2 điều kiện:
- Sau khi tạo chuỗi công nhân, hàm sẽ được trả về (vì vậy nơi khác không cần đợi)
- Có một cơ chế để dừng luồng này.
Ví dụ, đây là mã giả của tôi:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Tôi có một lỗi tại thread: JoinHandle<?>
. Loại chủ đề trong trường hợp này là gì. Và mã của tôi có chính xác để bắt đầu và dừng một luồng công nhân không?
Trả lời
Nói tóm lại, T
in join()on a JoinHandleJoinHandle<?>
sẽ cần phải là JoinHandle<()>
như đóng cửa của bạn không trả lại gì , tức là ()(đơn vị) .
Ngoài ra, mã giả của bạn chứa một số vấn đề bổ sung.
- Loại trả về
run()
là không chính xác và ít nhất sẽ phải như vậyResult<(), ()>
. - Các
thread
lĩnh vực sẽ cần phải cóOption<JoinHandle<()>
để có thể xử lýfn stop(&mut self)
như join()tiêu thụ cácJoinHandle
. - Tuy nhiên, bạn đang cố gắng chuyển
&mut self
sang phần đóng, điều này mang lại nhiều vấn đề hơn, dẫn đến nhiều tham chiếu có thể thay đổi- Điều này có thể được giải quyết với ví dụ
Mutex<A>
. Tuy nhiên, nếu bạn gọistop()
thì điều đó có thể dẫn đến bế tắc.
- Điều này có thể được giải quyết với ví dụ
Tuy nhiên, vì nó là mã giả, và bạn đã làm rõ trong phần bình luận. Hãy để tôi thử và làm rõ ý của bạn với một vài ví dụ. Điều này bao gồm việc tôi viết lại mã giả của bạn.
Kết quả sau khi công nhân làm xong
Nếu bạn không cần quyền truy cập vào dữ liệu trong khi chuỗi công nhân đang chạy, thì bạn có thể tạo mới struct WorkerData
. Sau đó, run()
bạn sao chép / sao chép dữ liệu bạn cần A
(hoặc như tôi đã đổi tên nó Worker
). Sau đó, cuối cùng bạn quay trở lại data
một lần nữa, vì vậy bạn có thể tiếp thu nó thông qua join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Bạn không thực sự cần thread
phải có Option<JoinHandle<WorkerData>>
và thay vào đó chỉ cần sử dụng JoinHandle<WorkerData>>
. Bởi vì nếu bạn muốn gọi run()
lại, sẽ dễ dàng hơn khi gán lại biến đang giữ Worker
.
Vì vậy, bây giờ chúng ta có thể đơn giản hóa Worker
, loại bỏ Option
và thay đổi stop
để tiêu thụ thread
thay vào đó, cùng với việc tạo new() -> Self
tại chỗ run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Được chia sẻ WorkerData
Nếu bạn muốn giữ lại các tham chiếu đến WorkerData
giữa nhiều chuỗi, thì bạn cần phải sử dụng Arc. Vì bạn cũng muốn có thể thay đổi nó, bạn sẽ cần phải sử dụng một Mutex.
Nếu bạn chỉ thay đổi trong một chuỗi duy nhất, thì bạn có thể thay thế bạn a RwLock, so với a Mutex
sẽ cho phép bạn khóa và nhận được nhiều tham chiếu bất biến cùng một lúc.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Nếu bạn thêm một phương thức để có thể lấy data
ở dạng Arc<RwLock<WorkerData>>
. Sau đó, nếu bạn sao chép Arc
và khóa nó (bên trong RwLock
) trước khi gọi stop()
, thì điều đó sẽ dẫn đến bế tắc. Để tránh điều đó, bất kỳ data()
phương thức nào cũng nên trả về &WorkerData
hoặc &mut WorkerData
thay vì Arc
. Bằng cách đó, bạn sẽ không thể gọi stop()
và gây ra bế tắc.
Cờ để dừng công nhân
Nếu bạn thực sự muốn dừng chuỗi công nhân, thì bạn phải sử dụng cờ để báo hiệu nó làm như vậy. Bạn có thể tạo cờ dưới dạng chia sẻ AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Nhiều chủ đề và nhiều nhiệm vụ
Nếu bạn muốn nhiều loại tác vụ được xử lý, trải dài trên nhiều chuỗi, thì đây là một ví dụ tổng quát hơn.
Bạn đã đề cập đến việc sử dụng mpsc. Vì vậy, bạn có thể sử dụng a Sendervà Receivercùng với một tùy chỉnh Task
và TaskResult
enum.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Một ví dụ về việc sử dụng Worker
cùng với việc gửi tác vụ và nhận kết quả tác vụ, sau đó sẽ trông như thế này:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
Trong một số trường hợp, bạn có thể cần phải triển khai Send
cho Task
và / hoặc TaskResult
. Kiểm tra "Hiểu đặc điểm Gửi" .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Tham số kiểu của a JoinHandle
phải là kiểu trả về của hàm của luồng.
Trong trường hợp này, kiểu trả về là một bộ giá trị trống ()
, đơn vị được phát âm . Nó được sử dụng khi chỉ có một giá trị duy nhất có thể và là "kiểu trả về" ngầm định của các hàm khi không có kiểu trả về nào được chỉ định.
Bạn chỉ có thể viết JoinHandle<()>
để biểu thị rằng hàm sẽ không trả về bất cứ thứ gì.
(Lưu ý: Mã của bạn sẽ gặp phải một số vấn đề với trình kiểm tra khoản vay self.call()
, có thể cần phải giải quyết Arc<Mutex<Self>>
, nhưng đó là một câu hỏi khác.)