작업자 스레드를 시작하고 중지하는 방법

Jan 03 2021

다른 프로그래밍 언어에서 표준 인 다음 요구 사항이 있지만 Rust에서 수행하는 방법을 모르겠습니다.

클래스가 있는데 두 가지 조건을 충족하는 작업자 스레드를 생성하는 메서드를 작성하고 싶습니다.

  • 작업자 스레드를 생성 한 후 함수가 반환됩니다 (따라서 다른 곳은 기다릴 필요가 없음).
  • 이 스레드를 중지하는 메커니즘이 있습니다.

예를 들어, 다음은 내 더미 코드입니다.

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

에 오류가 thread: JoinHandle<?>있습니다. 이 경우 스레드 유형은 무엇입니까? 내 코드가 작업자 스레드를 시작하고 중지하는 데 맞습니까?

답변

3 vallentin Jan 03 2021 at 02:07

요컨대, Tin은 join()에 JoinHandle전달 된 클로저의 결과를 반환합니다 thread::spawn(). 귀하의 경우 그래서 것은 JoinHandle<?>할 필요가 JoinHandle<()>귀하의 폐쇄를 반환으로 아무것도 즉, ()(단위) .

그 외에 더미 코드에는 몇 가지 추가 문제가 있습니다.

  • 의 반환 유형 run()이 올바르지 않으며 최소한이어야합니다 Result<(), ()>.
  • thread필드는 할 필요가 Option<JoinHandle<()>할 수있는 처리 fn stop(&mut self) 로 join()소모 JoinHandle.
  • 그러나, 당신은 &mut self클로저 로 전달하려고하는데 , 이것은 더 많은 문제를 가져 와서 여러 개의 가변 참조로 끓어 오릅니다.
    • 이것은 예를 들어 해결할 수 있습니다 Mutex<A>. 그러나 전화하면 stop()대신 교착 상태가 발생할 수 있습니다.

그러나 그것은 더미 코드이기 때문에 주석에서 명확히했습니다. 몇 가지 예를 들어 당신이 의미하는 바를 명확히하겠습니다. 여기에는 더미 코드를 다시 작성하는 것도 포함됩니다.

작업자 완료 후 결과

작업자 스레드가 실행되는 동안 데이터에 액세스 할 필요가 없으면 새 struct WorkerData. 그런 다음 run()필요한 데이터를 복사 / 복제합니다 A(또는 이름을 변경 했음 Worker). 그런 다음 클로저에서 마침내 data다시 돌아 오므로 join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

당신은 정말로 thread될 필요 가 없으며 Option<JoinHandle<WorkerData>>대신 JoinHandle<WorkerData>>. run()다시 호출 하려면 Worker.

그래서 지금 우리는 간단하게 할 수 Worker을 제거, Option변경 stop소비 thread생성과 함께, 대신 new() -> Self의 자리에 run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

공유 WorkerData

WorkerData여러 스레드간에 참조를 유지 하려면 Arc. 추가로 변경할 수 있기를 원하므로 Mutex.

단일 스레드 내에서만 변경하는 경우 RwLock, a 와 비교 Mutex하여 동시에 여러 개의 불변 참조를 잠그고 얻을 수 있는 을 사용할 수 있습니다.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

data형식으로 얻을 수있는 메서드를 추가하면 Arc<RwLock<WorkerData>>. 그런 다음 을 호출하기 전에을 복제 Arc하고 잠그면 (inner RwLock) stop()교착 상태가 발생합니다. 이를 방지하기 위해 어떤 data()방법을 반환해야 &WorkerData또는 &mut WorkerData대신의 Arc. 그렇게하면 전화를 걸 수없고 stop()교착 상태가 발생할 수 있습니다.

작업자 중지 플래그

실제로 작업자 스레드를 중지하려면 플래그를 사용하여 신호를 보내야합니다. 공유 형식으로 플래그를 만들 수 있습니다 AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

여러 스레드 및 여러 작업

여러 종류의 작업을 처리하고 여러 스레드에 분산 시키려면 더 일반적인 예가 있습니다.

이미 mpsc. 그래서 당신은 사용할 수 있습니다 Sender및 Receiver사용자 정의와 함께을 Task하고 TaskResult열거.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Worker작업 전송 및 작업 결과 수신과 함께 를 사용하는 예 는 다음과 같습니다.

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

몇 가지 경우에 당신은 구현해야 할 수도 있습니다 Send에 대한 Task및 / 또는 TaskResult. "전송 특성 이해"를 확인하십시오 .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

a의 유형 매개 변수 JoinHandle는 스레드 함수의 반환 유형이어야합니다.

이 경우 반환 유형은 unit으로() 발음 되는 빈 튜플 입니다. 가능한 값이 하나만있을 때 사용되며 반환 유형이 지정되지 않은 경우 함수의 암시 적 "반환 유형"입니다.

JoinHandle<()>함수가 아무것도 반환하지 않음을 나타 내기 위해 작성할 수 있습니다 .

(참고 : 코드에서에서 일부 차용 검사기 문제 self.call()가 발생하므로으로 해결해야 할 수 Arc<Mutex<Self>>있지만 이는 또 다른 질문입니다.)