Cara memulai dan menghentikan utas pekerja
Saya memiliki persyaratan berikut yang merupakan standar dalam bahasa pemrograman lain tetapi saya tidak tahu bagaimana melakukannya di Rust.
Saya memiliki kelas, saya ingin menulis metode untuk menelurkan utas pekerja yang memenuhi 2 kondisi:
- Setelah memunculkan thread pekerja, fungsinya adalah kembali (jadi tempat lain tidak perlu menunggu)
- Ada mekanisme untuk menghentikan utas ini.
Misalnya, berikut adalah kode dummy saya:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Saya mengalami kesalahan di thread: JoinHandle<?>
. Apa jenis utas dalam kasus ini. Dan apakah kode saya benar untuk memulai dan menghentikan thread pekerja?
Jawaban
Singkatnya, T
in join()on a JoinHandleJoinHandle<?>
akan perlu JoinHandle<()>
karena closure Anda tidak menghasilkan apa-apa , yaitu ()(unit) .
Selain itu, kode dummy Anda berisi beberapa masalah tambahan.
- Jenis kembalian
run()
salah, dan setidaknya harusResult<(), ()>
. - The
thread
lapangan akan perluOption<JoinHandle<()>
untuk dapat menanganifn stop(&mut self)
sebagai join()mengkonsumsi yangJoinHandle
. - Namun, Anda mencoba untuk lolos
&mut self
ke closure, yang membawa lebih banyak masalah, bermuara pada beberapa referensi yang bisa berubah- Ini bisa diselesaikan dengan mis
Mutex<A>
. Namun, jika Anda meneleponstop()
maka itu bisa menyebabkan kebuntuan.
- Ini bisa diselesaikan dengan mis
Namun, karena itu adalah kode tiruan, dan Anda mengklarifikasi di komentar. Izinkan saya mencoba menjelaskan apa yang Anda maksud dengan beberapa contoh. Ini termasuk saya menulis ulang kode dummy Anda.
Hasil setelah pekerja selesai
Jika Anda tidak memerlukan akses ke data saat thread pekerja sedang berjalan, Anda dapat membuat yang baru struct WorkerData
. Kemudian run()
Anda menyalin / mengkloning data yang Anda butuhkan A
(atau seperti yang telah saya ubah namanya Worker
). Kemudian di penutupan Anda akhirnya kembali data
lagi, sehingga Anda dapat memperolehnya melalui join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Anda tidak benar-benar perlu thread
menjadi Option<JoinHandle<WorkerData>>
dan sebagai gantinya bisa langsung menggunakan JoinHandle<WorkerData>>
. Karena jika Anda ingin memanggil run()
lagi, akan lebih mudah untuk menetapkan kembali variabel yang menahan Worker
.
Jadi sekarang kita bisa menyederhanakan Worker
, menghapus Option
dan mengubah stop
untuk dikonsumsi thread
sebagai gantinya, bersama dengan membuat new() -> Self
di tempat run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Bersama WorkerData
Jika Anda ingin mempertahankan referensi di WorkerData
antara beberapa utas, Anda harus menggunakan Arc. Karena Anda juga ingin dapat memutasinya, Anda harus menggunakan file Mutex.
Jika Anda hanya akan bermutasi dalam satu utas, maka Anda dapat memilih a RwLock, yang dibandingkan dengan a Mutex
akan memungkinkan Anda untuk mengunci dan mendapatkan beberapa referensi yang tidak dapat diubah pada saat yang bersamaan.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Jika Anda menambahkan metode untuk mendapatkan data
dalam bentuk Arc<RwLock<WorkerData>>
. Kemudian jika Anda mengkloning Arc
dan menguncinya (bagian dalam RwLock
) sebelum menelepon stop()
, maka itu akan mengakibatkan kebuntuan. Untuk menghindarinya, data()
metode apa pun harus mengembalikan &WorkerData
atau &mut WorkerData
sebagai pengganti Arc
. Dengan cara itu Anda tidak dapat menelepon stop()
dan menyebabkan kebuntuan.
Tandai untuk menghentikan pekerja
Jika Anda benar-benar ingin menghentikan utas pekerja, Anda harus menggunakan bendera untuk memberi isyarat agar melakukannya. Anda dapat membuat bendera dalam bentuk bersama AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Banyak utas dan banyak tugas
Jika Anda ingin beberapa jenis tugas diproses, tersebar di beberapa utas, berikut adalah contoh yang lebih umum.
Anda sudah menyebutkan menggunakan mpsc. Jadi Anda bisa menggunakan a Senderdan Receiverbersama dengan custom Task
dan TaskResult
enum.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Contoh penggunaan Worker
bersama dengan mengirim tugas dan menerima hasil tugas, akan terlihat seperti ini:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
Dalam beberapa kasus, Anda mungkin perlu menerapkan Send
untuk Task
dan / atau TaskResult
. Lihat "Memahami sifat Kirim" .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
Parameter tipe a JoinHandle
haruslah tipe kembalian dari fungsi utas.
Dalam hal ini, tipe yang dikembalikan adalah tupel kosong ()
, unit diucapkan . Ini digunakan ketika hanya ada satu nilai yang mungkin, dan merupakan "tipe pengembalian" fungsi yang implisit ketika tidak ada tipe pengembalian yang ditentukan.
Anda bisa menulis JoinHandle<()>
untuk menyatakan bahwa fungsi tersebut tidak akan mengembalikan apa pun.
(Catatan: Kode Anda akan mengalami beberapa masalah peminjam pemeriksa self.call()
, yang mungkin perlu diselesaikan dengan Arc<Mutex<Self>>
, tapi itu pertanyaan lain.)