ワーカースレッドを開始および停止する方法

Jan 03 2021

他のプログラミング言語では標準的な次の要件がありますが、Rustで行う方法がわかりません。

クラスがあり、2つの条件を満たすワーカースレッドを生成するメソッドを記述したいと思います。

  • ワーカースレッドを生成した後、関数は戻ります(したがって、他の場所は待つ必要はありません)
  • このスレッドを停止するメカニズムがあります。

たとえば、これが私のダミーコードです。

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

でエラーが発生しましたthread: JoinHandle<?>。この場合のスレッドのタイプは何ですか。そして、私のコードはワーカースレッドを開始および停止するのに正しいですか?

回答

3 vallentin Jan 03 2021 at 02:07

つまり、Tin join()onJoinHandleは、に渡されたクロージャの結果を返しますthread::spawn()。したがって、あなたの場合、クロージャは何も返さないので、つまり(ユニット)であるJoinHandle<?>必要がありJoinHandle<()>ます。()

それ以外に、ダミーコードにはいくつかの追加の問題が含まれています。

  • の戻り値の型run()が正しくないため、少なくともResult<(), ()>。である必要があります。
  • threadフィールドがあることが必要となるOption<JoinHandle<()>ことができるようにハンドル fn stop(&mut self)としてjoin()消費しますJoinHandle
  • ただし、&mut selfクロージャに渡そうとしているため、さらに多くの問題が発生し、複数の可変参照に要約されます。
    • これは、例えばで解決することができますMutex<A>。ただし、電話をかけるとstop()、代わりにデッドロックが発生する可能性があります。

しかし、それはダミーコードだったので、コメントで明確にしました。いくつかの例を挙げて、あなたが何を意味するのかを明確にしてみましょう。これには、ダミーコードの書き換えも含まれます。

労働者が終わった後の結果

ワーカースレッドの実行中にデータにアクセスする必要がない場合は、新しいを作成できますstruct WorkerData。次に、run()必要なデータをコピー/クローンしますA(または名前を変更しましたWorker)。その後、クロージャーで最終的にdata再び戻るので、を介してそれを取得できますjoin()

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

あなたは本当に必要はありませんthreadようにOption<JoinHandle<WorkerData>>とだけではなく、使用することができますJoinHandle<WorkerData>>run()もう一度呼び出したい場合は、を保持している変数を再割り当てする方が簡単だからWorkerです。

だから今、私たちは単純化することができWorker、削除、Optionおよび変更をstop消費するためにthread作成するとともに、代わりnew() -> Selfの代わりにrun(&mut self)

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

共有 WorkerData

WorkerData複数のスレッド間での参照を保持する場合は、を使用する必要がありますArc。さらにそれを変更できるようにしたいので、を使用する必要がありますMutex。

単一のスレッド内でのみ変更する場合はRwLock、代わりに、Mutexを使用することもできます。これをaと比較すると、複数の不変の参照を同時にロックして取得できます。

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

dataの形式で取得できるメソッドを追加した場合Arc<RwLock<WorkerData>>。次に、を呼び出す前にクローンを作成しArcてロックすると(内部RwLockstop()、デッドロックが発生します。これを回避するには、すべてのdata()メソッドが&WorkerDataまたはの&mut WorkerData代わりにを返す必要がありArcます。そうすれば、電話をかけstop()てデッドロックを引き起こすことができなくなります。

労働者を停止するフラグ

実際にワーカースレッドを停止したい場合は、フラグを使用して停止するように通知する必要があります。共有の形式でフラグを作成できますAtomicBool。

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

複数のスレッドと複数のタスク

複数の種類のタスクを処理し、複数のスレッドに分散させたい場合は、より一般的な例を次に示します。

を使用してすでに言及しましたmpsc。したがって、カスタムおよび列挙型SenderとReceiver一緒におよびを使用できます。TaskTaskResult

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Workerタスクの送信とタスク結果の受信とともにを使用する例は、次のようになります。

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

場合によってはSendTaskおよび/またはの実装が必要になることがありますTaskResult「送信特性を理解する」を確認してください。

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

aのtypeパラメーターはJoinHandle、スレッドの関数の戻り値の型である必要があります。

この場合、戻り値の型は空のタプル()、発音された単位です。可能な値が1つしかない場合に使用され、戻り値の型が指定されていない場合の関数の暗黙の「戻り値の型」です。

JoinHandle<()>関数が何も返さないことを表すために書くことができます。

(注:コードは、でいくつかの借用チェッカーの問題に遭遇しますself.call()。これはおそらくで解決する必要がありますが、それはArc<Mutex<Self>>別の質問です。)