Çalışan iş parçacığı nasıl başlatılır ve durdurulur
Diğer programlama dillerinde standart olan aşağıdaki bir gereksinimim var, ancak Rust'ta nasıl yapılacağını bilmiyorum.
Bir sınıfım var, 2 koşulu karşılayan bir işçi iş parçacığı oluşturmak için bir yöntem yazmak istiyorum:
- Çalışan iş parçacığı oluşturulduktan sonra işlev geri döner (bu nedenle diğer yerin beklemesi gerekmez)
- Bu ipliği durdurmak için bir mekanizma var.
Örneğin, işte sahte kodum:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Adresinde bir hatam var thread: JoinHandle<?>
. Bu durumda iplik türü nedir? Ve bir çalışan iş parçacığını başlatmak ve durdurmak için kodum doğru mu?
Yanıtlar
Kısaca, T
in a join()on JoinHandleJoinHandle<?>
olması gerekir JoinHandle<()>
sizin kapanış döner olarak hiçbir şey yani ()(birim) .
Bunun dışında, sahte kodunuz birkaç ek sorun içerir.
- Dönüş türü
run()
yanlış ve en azından olması gerekirResult<(), ()>
. thread
Alan olması gerekirOption<JoinHandle<()>
edebilmek için idarefn stop(&mut self)
olarak join()tüketirJoinHandle
.- Ancak,
&mut self
kapanışa geçmeye çalışıyorsunuz , bu da çok daha fazla sorunu beraberinde getiriyor ve birden fazla değişken referansa indirgeniyor.- Bu, örneğin çözülebilir
Mutex<A>
. Ancak, ararsanız,stop()
bunun yerine bir kilitlenmeye yol açabilir.
- Bu, örneğin çözülebilir
Ancak, sahte kod olduğundan ve yorumlarda açıkladınız. Birkaç örnekle ne demek istediğini açıklamama izin ver. Bu, sahte kodunuzu yeniden yazmamı içerir.
İşçi bittikten sonraki sonuç
Çalışan iş parçacığı çalışırken verilere erişmeniz gerekmiyorsa, yeni bir struct WorkerData
. Sonra içinde run()
ihtiyacınız olan verileri kopyalayın / klonlayın A
(veya yeniden adlandırdığım gibi Worker
). Sonra kapanışta nihayet data
tekrar geri dönersiniz , böylece onu elde edebilirsiniz join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Gerçekten gerek yok thread
olmaya Option<JoinHandle<WorkerData>>
ve bunun yerine sadece kullanabilirsiniz JoinHandle<WorkerData>>
. Çünkü run()
tekrar aramak isteseydiniz , değişkeni Worker
.
Şimdi biz kolaylaştırabilirsiniz Worker
çıkarmadan, Option
ve değiştirmek stop
tüketmek thread
oluşturarak birlikte yerine new() -> Self
yerine run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Paylaşılan WorkerData
Birden WorkerData
çok iş parçacığı arasındaki başvuruları saklamak istiyorsanız , kullanmanız gerekir Arc. Ek olarak onu değiştirebilmek istediğiniz için, bir Mutex.
Yalnızca tek bir iş parçacığı içinde mutasyona uğrayacaksanız RwLock, alternatif olarak , a ile karşılaştırıldığında Mutex
, aynı anda birden fazla değişmez referansı kilitlemenize ve elde etmenize olanak tanıyan a'yı yapabilirsiniz.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Eğer bir yöntemi eklerseniz elde edebilmek için data
şeklinde Arc<RwLock<WorkerData>>
. Ardından , aramadan önce klonlar Arc
ve kilitlerseniz (iç RwLock
) stop()
, bu bir kilitlenmeyle sonuçlanır. Bunu önlemek için herhangi bir data()
yöntem dönmelidir &WorkerData
veya &mut WorkerData
yerine Arc
. Bu şekilde stop()
arayamaz ve bir çıkmaza neden olamazsınız.
İşçiyi durdurmak için işaretle
Çalışan iş parçacığını gerçekten durdurmak istiyorsanız, bunu yapmasını işaret etmek için bir bayrak kullanmanız gerekir. Paylaşılan şeklinde bir bayrak oluşturabilirsiniz AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Birden çok iş parçacığı ve birden çok görev
Birden çok türde görevin işlenmesini, birden çok iş parçacığına yayılmasını istiyorsanız, işte size daha genel bir örnek.
Kullanmaktan zaten bahsetmiştin mpsc. Böylece bir özel ve enum ile birlikte Senderve kullanabilirsiniz .ReceiverTask
TaskResult
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Worker
Görev gönderme ve görev sonuçlarını alma ile birlikte kullanımına bir örnek daha sonra şöyle görünecektir:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
Bazı durumlarda uygulamak gerekebilir Send
için Task
ve / veya TaskResult
. Check out "Gönder özelliğini anlama" .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
A'nın tür parametresi JoinHandle
, evre işlevinin dönüş türü olmalıdır.
Bu durumda, dönüş türü boş bir tuple ()
, telaffuz edilen birimdir . Yalnızca bir değer mümkün olduğunda kullanılır ve hiçbir dönüş türü belirtilmediğinde işlevlerin örtük "dönüş türü" dir.
JoinHandle<()>
İşlevin hiçbir şey döndürmeyeceğini belirtmek için yazabilirsiniz .
(Not: Kodunuz, self.call()
muhtemelen çözülmesi gereken bazı ödünç denetleyicisi sorunlarıyla karşılaşacaktır Arc<Mutex<Self>>
, ancak bu başka bir soru.)