ワーカースレッドを開始および停止する方法
他のプログラミング言語では標準的な次の要件がありますが、Rustで行う方法がわかりません。
クラスがあり、2つの条件を満たすワーカースレッドを生成するメソッドを記述したいと思います。
- ワーカースレッドを生成した後、関数は戻ります(したがって、他の場所は待つ必要はありません)
- このスレッドを停止するメカニズムがあります。
たとえば、これが私のダミーコードです。
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
でエラーが発生しましたthread: JoinHandle<?>
。この場合のスレッドのタイプは何ですか。そして、私のコードはワーカースレッドを開始および停止するのに正しいですか?
回答
つまり、T
in join()onJoinHandleJoinHandle<?>
必要がありJoinHandle<()>
ます。()
それ以外に、ダミーコードにはいくつかの追加の問題が含まれています。
- の戻り値の型
run()
が正しくないため、少なくともResult<(), ()>
。である必要があります。 thread
フィールドがあることが必要となるOption<JoinHandle<()>
ことができるようにハンドルfn stop(&mut self)
としてjoin()消費しますJoinHandle
。- ただし、
&mut self
クロージャに渡そうとしているため、さらに多くの問題が発生し、複数の可変参照に要約されます。- これは、例えばで解決することができます
Mutex<A>
。ただし、電話をかけるとstop()
、代わりにデッドロックが発生する可能性があります。
- これは、例えばで解決することができます
しかし、それはダミーコードだったので、コメントで明確にしました。いくつかの例を挙げて、あなたが何を意味するのかを明確にしてみましょう。これには、ダミーコードの書き換えも含まれます。
労働者が終わった後の結果
ワーカースレッドの実行中にデータにアクセスする必要がない場合は、新しいを作成できますstruct WorkerData
。次に、run()
必要なデータをコピー/クローンしますA
(または名前を変更しましたWorker
)。その後、クロージャーで最終的にdata
再び戻るので、を介してそれを取得できますjoin()
。
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
あなたは本当に必要はありませんthread
ようにOption<JoinHandle<WorkerData>>
とだけではなく、使用することができますJoinHandle<WorkerData>>
。run()
もう一度呼び出したい場合は、を保持している変数を再割り当てする方が簡単だからWorker
です。
だから今、私たちは単純化することができWorker
、削除、Option
および変更をstop
消費するためにthread
作成するとともに、代わりnew() -> Self
の代わりにrun(&mut self)
。
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
共有 WorkerData
WorkerData
複数のスレッド間での参照を保持する場合は、を使用する必要がありますArc。さらにそれを変更できるようにしたいので、を使用する必要がありますMutex。
単一のスレッド内でのみ変更する場合はRwLock、代わりに、Mutex
を使用することもできます。これをaと比較すると、複数の不変の参照を同時にロックして取得できます。
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
data
の形式で取得できるメソッドを追加した場合Arc<RwLock<WorkerData>>
。次に、を呼び出す前にクローンを作成しArc
てロックすると(内部RwLock
)stop()
、デッドロックが発生します。これを回避するには、すべてのdata()
メソッドが&WorkerData
またはの&mut WorkerData
代わりにを返す必要がありArc
ます。そうすれば、電話をかけstop()
てデッドロックを引き起こすことができなくなります。
労働者を停止するフラグ
実際にワーカースレッドを停止したい場合は、フラグを使用して停止するように通知する必要があります。共有の形式でフラグを作成できますAtomicBool。
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
複数のスレッドと複数のタスク
複数の種類のタスクを処理し、複数のスレッドに分散させたい場合は、より一般的な例を次に示します。
を使用してすでに言及しましたmpsc。したがって、カスタムおよび列挙型SenderとReceiver一緒におよびを使用できます。Task
TaskResult
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Worker
タスクの送信とタスク結果の受信とともにを使用する例は、次のようになります。
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
場合によってはSend
、Task
および/またはの実装が必要になることがありますTaskResult
。「送信特性を理解する」を確認してください。
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
aのtypeパラメーターはJoinHandle
、スレッドの関数の戻り値の型である必要があります。
この場合、戻り値の型は空のタプル()
、発音された単位です。可能な値が1つしかない場合に使用され、戻り値の型が指定されていない場合の関数の暗黙の「戻り値の型」です。
JoinHandle<()>
関数が何も返さないことを表すために書くことができます。
(注:コードは、でいくつかの借用チェッカーの問題に遭遇しますself.call()
。これはおそらくで解決する必要がありますが、それはArc<Mutex<Self>>
別の質問です。)