작업자 스레드를 시작하고 중지하는 방법
다른 프로그래밍 언어에서 표준 인 다음 요구 사항이 있지만 Rust에서 수행하는 방법을 모르겠습니다.
클래스가 있는데 두 가지 조건을 충족하는 작업자 스레드를 생성하는 메서드를 작성하고 싶습니다.
- 작업자 스레드를 생성 한 후 함수가 반환됩니다 (따라서 다른 곳은 기다릴 필요가 없음).
- 이 스레드를 중지하는 메커니즘이 있습니다.
예를 들어, 다음은 내 더미 코드입니다.
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
에 오류가 thread: JoinHandle<?>
있습니다. 이 경우 스레드 유형은 무엇입니까? 내 코드가 작업자 스레드를 시작하고 중지하는 데 맞습니까?
답변
요컨대, T
in은 join()에 JoinHandleJoinHandle<?>
할 필요가 JoinHandle<()>
귀하의 폐쇄를 반환으로 아무것도 즉, ()(단위) .
그 외에 더미 코드에는 몇 가지 추가 문제가 있습니다.
- 의 반환 유형
run()
이 올바르지 않으며 최소한이어야합니다Result<(), ()>
. thread
필드는 할 필요가Option<JoinHandle<()>
할 수있는 처리fn stop(&mut self)
로 join()소모JoinHandle
.- 그러나, 당신은
&mut self
클로저 로 전달하려고하는데 , 이것은 더 많은 문제를 가져 와서 여러 개의 가변 참조로 끓어 오릅니다.- 이것은 예를 들어 해결할 수 있습니다
Mutex<A>
. 그러나 전화하면stop()
대신 교착 상태가 발생할 수 있습니다.
- 이것은 예를 들어 해결할 수 있습니다
그러나 그것은 더미 코드이기 때문에 주석에서 명확히했습니다. 몇 가지 예를 들어 당신이 의미하는 바를 명확히하겠습니다. 여기에는 더미 코드를 다시 작성하는 것도 포함됩니다.
작업자 완료 후 결과
작업자 스레드가 실행되는 동안 데이터에 액세스 할 필요가 없으면 새 struct WorkerData
. 그런 다음 run()
필요한 데이터를 복사 / 복제합니다 A
(또는 이름을 변경 했음 Worker
). 그런 다음 클로저에서 마침내 data
다시 돌아 오므로 join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
당신은 정말로 thread
될 필요 가 없으며 Option<JoinHandle<WorkerData>>
대신 JoinHandle<WorkerData>>
. run()
다시 호출 하려면 Worker
.
그래서 지금 우리는 간단하게 할 수 Worker
을 제거, Option
변경 stop
소비 thread
생성과 함께, 대신 new() -> Self
의 자리에 run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
공유 WorkerData
WorkerData
여러 스레드간에 참조를 유지 하려면 Arc. 추가로 변경할 수 있기를 원하므로 Mutex.
단일 스레드 내에서만 변경하는 경우 RwLock, a 와 비교 Mutex
하여 동시에 여러 개의 불변 참조를 잠그고 얻을 수 있는 을 사용할 수 있습니다.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
data
형식으로 얻을 수있는 메서드를 추가하면 Arc<RwLock<WorkerData>>
. 그런 다음 을 호출하기 전에을 복제 Arc
하고 잠그면 (inner RwLock
) stop()
교착 상태가 발생합니다. 이를 방지하기 위해 어떤 data()
방법을 반환해야 &WorkerData
또는 &mut WorkerData
대신의 Arc
. 그렇게하면 전화를 걸 수없고 stop()
교착 상태가 발생할 수 있습니다.
작업자 중지 플래그
실제로 작업자 스레드를 중지하려면 플래그를 사용하여 신호를 보내야합니다. 공유 형식으로 플래그를 만들 수 있습니다 AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
여러 스레드 및 여러 작업
여러 종류의 작업을 처리하고 여러 스레드에 분산 시키려면 더 일반적인 예가 있습니다.
이미 mpsc. 그래서 당신은 사용할 수 있습니다 Sender및 Receiver사용자 정의와 함께을 Task
하고 TaskResult
열거.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Worker
작업 전송 및 작업 결과 수신과 함께 를 사용하는 예 는 다음과 같습니다.
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
몇 가지 경우에 당신은 구현해야 할 수도 있습니다 Send
에 대한 Task
및 / 또는 TaskResult
. "전송 특성 이해"를 확인하십시오 .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
a의 유형 매개 변수 JoinHandle
는 스레드 함수의 반환 유형이어야합니다.
이 경우 반환 유형은 unit으로()
발음 되는 빈 튜플 입니다. 가능한 값이 하나만있을 때 사용되며 반환 유형이 지정되지 않은 경우 함수의 암시 적 "반환 유형"입니다.
JoinHandle<()>
함수가 아무것도 반환하지 않음을 나타 내기 위해 작성할 수 있습니다 .
(참고 : 코드에서에서 일부 차용 검사기 문제 self.call()
가 발생하므로으로 해결해야 할 수 Arc<Mutex<Self>>
있지만 이는 또 다른 질문입니다.)