Como iniciar e parar um thread de trabalho
Eu tenho o seguinte requisito que é padrão em outras linguagens de programação, mas não sei como fazer no Rust.
Eu tenho uma classe, quero escrever um método para gerar um thread de trabalho que satisfaça 2 condições:
- Depois de gerar o thread de trabalho, a função retorna (para que outro lugar não precise esperar)
- Existe um mecanismo para interromper este segmento.
Por exemplo, aqui está meu código fictício:
struct A {
thread: JoinHandle<?>,
}
impl A {
pub fn run(&mut self) -> Result<()>{
self.thread = thread::spawn(move || {
let mut i = 0;
loop {
self.call();
i = 1 + i;
if i > 5 {
return
}
}
});
Ok(())
}
pub fn stop(&mut self) -> std::thread::Result<_> {
self.thread.join()
}
pub fn call(&mut self) {
println!("hello world");
}
}
fn main() {
let mut a = A{};
a.run();
}
Eu tenho um erro em thread: JoinHandle<?>
. Qual é o tipo de thread neste caso. E meu código está correto para iniciar e interromper um thread de trabalho?
Respostas
Resumindo, o T
in join()em a JoinHandleJoinHandle<?>
precisaria ser, JoinHandle<()>
pois o fechamento não retorna nada , ou seja, ()(unidade) .
Além disso, seu código fictício contém alguns problemas adicionais.
- O tipo de retorno de
run()
está incorreto e deveria pelo menos serResult<(), ()>
. - O
thread
campo precisaOption<JoinHandle<()>
ser capaz de lidar com ofn stop(&mut self)
que join()consomeJoinHandle
. - No entanto, você está tentando passar
&mut self
para o encerramento, o que traz muito mais problemas, resumindo-se a várias referências mutáveis- Isso poderia ser resolvido com, por exemplo
Mutex<A>
. No entanto, se você ligarstop()
, isso pode levar a um deadlock.
- Isso poderia ser resolvido com, por exemplo
Porém, já que era um código fictício, e você esclareceu nos comentários. Deixe-me tentar esclarecer o que você quis dizer com alguns exemplos. Isso inclui reescrever seu código fictício.
Resultado depois que o trabalhador terminar
Se você não precisa acessar os dados enquanto o thread de trabalho está em execução, você pode fazer um novo struct WorkerData
. Em seguida, run()
você copia / clona os dados de que precisa A
(ou como eu os renomei Worker
). Então, no fechamento, você finalmente retorna data
novamente, para que possa adquiri-lo completamente join()
.
use std::thread::{self, JoinHandle};
struct WorkerData {
...
}
impl WorkerData {
pub fn call(&mut self) {
println!("hello world");
}
}
struct Worker {
thread: Option<JoinHandle<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
Self { thread: None }
}
pub fn run(&mut self) {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
self.thread = Some(thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
// Return `data` so we get in through `join()`
return data;
}
}
}));
}
pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
if let Some(handle) = self.thread.take() {
Some(handle.join())
} else {
None
}
}
}
Você realmente não precisa thread
ser Option<JoinHandle<WorkerData>>
e, em vez disso, pode apenas usar JoinHandle<WorkerData>>
. Porque se você quisesse chamar run()
novamente, seria mais fácil reatribuir a variável que contém o Worker
.
Portanto, agora podemos simplificar Worker
, removendo Option
e alterando stop
para consumir thread
, juntamente com a criação new() -> Self
no lugar de run(&mut self)
.
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<WorkerData>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let mut data = WorkerData {};
let thread = thread::spawn(move || {
let mut i = 0;
loop {
data.call();
i = 1 + i;
if i > 5 {
return data;
}
}
});
Self { thread }
}
pub fn stop(self) -> thread::Result<WorkerData> {
self.thread.join()
}
}
Compartilhado WorkerData
Se você quiser reter as referências WorkerData
entre vários threads, precisará usar Arc. Já que você também deseja poder alterá-lo, você precisará usar um Mutex.
Se você apenas sofrerá mutações em um único thread, poderá alternativamente, você a RwLock, que, em comparação com a Mutex
, permitirá que você bloqueie e obtenha várias referências imutáveis ao mesmo tempo.
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let thread = thread::spawn({
let data = data.clone();
move || {
let mut i = 0;
loop {
if let Ok(mut data) = data.write() {
data.call();
}
i = 1 + i;
if i > 5 {
return;
}
}
}
});
Self { thread, data }
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Se você adicionar um método para poder obter data
na forma de Arc<RwLock<WorkerData>>
. Então, se você clonar Arc
e bloquear (o interno RwLock
) antes de chamar stop()
, isso resultará em um deadlock. Para evitar isso, qualquer data()
método deve retornar &WorkerData
ou em &mut WorkerData
vez do Arc
. Dessa forma, você não conseguiria ligar stop()
e causar um deadlock.
Sinalizar para parar o trabalhador
Se você realmente deseja interromper o thread de trabalho, deverá usar um sinalizador para sinalizá-lo. Você pode criar um sinalizador na forma de um compartilhado AtomicBool.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
struct Worker {
thread: JoinHandle<()>,
data: Arc<RwLock<WorkerData>>,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new() -> Self {
// Create `WorkerData` and copy/clone whatever is needed from `self`
let data = Arc::new(RwLock::new(WorkerData {}));
let stop_flag = Arc::new(AtomicBool::new(false));
let thread = thread::spawn({
let data = data.clone();
let stop_flag = stop_flag.clone();
move || {
// let mut i = 0;
loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
if let Ok(mut data) = data.write() {
data.call();
}
// i = 1 + i;
// if i > 5 {
// return;
// }
}
}
});
Self {
thread,
data,
stop_flag,
}
}
pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
self.stop_flag.store(true, Ordering::Relaxed);
self.thread.join()?;
// You might be able to unwrap and get the inner `WorkerData` here
Ok(self.data)
}
}
Múltiplos threads e múltiplas tarefas
Se você deseja que vários tipos de tarefas sejam processadas, espalhadas por vários threads, aqui está um exemplo mais generalizado.
Você já mencionou o uso mpsc. Portanto, você pode usar um Sendere Receiverjunto com um custom Task
e TaskResult
enum.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub enum Task {
...
}
pub enum TaskResult {
...
}
pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;
pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;
struct Worker {
threads: Vec<JoinHandle<()>>,
task_sender: TaskSender,
result_receiver: ResultReceiver,
stop_flag: Arc<AtomicBool>,
}
impl Worker {
pub fn new(num_threads: usize) -> Self {
let (task_sender, task_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let stop_flag = Arc::new(AtomicBool::new(false));
Self {
threads: (0..num_threads)
.map(|_| {
let task_receiver = task_receiver.clone();
let result_sender = result_sender.clone();
let stop_flag = stop_flag.clone();
thread::spawn(move || loop {
if stop_flag.load(Ordering::Relaxed) {
break;
}
let task_receiver = task_receiver.lock().unwrap();
if let Ok(task) = task_receiver.recv() {
drop(task_receiver);
// Perform the `task` here
// If the `Task` results in a `TaskResult` then create it and send it back
let result: TaskResult = ...;
// The `SendError` can be ignored as it only occurs if the receiver
// has already been deallocated
let _ = result_sender.send(result);
} else {
break;
}
})
})
.collect(),
task_sender,
result_receiver,
stop_flag,
}
}
pub fn stop(self) -> Vec<thread::Result<()>> {
drop(self.task_sender);
self.stop_flag.store(true, Ordering::Relaxed);
self.threads
.into_iter()
.map(|t| t.join())
.collect::<Vec<_>>()
}
#[inline]
pub fn request(&mut self, task: Task) {
self.task_sender.send(task).unwrap();
}
#[inline]
pub fn result_receiver(&mut self) -> &ResultReceiver {
&self.result_receiver
}
}
Um exemplo de uso de Worker
junto com o envio de tarefas e recebimento de resultados de tarefas ficaria assim:
fn main() {
let mut worker = Worker::new(4);
// Request that a `Task` is performed
worker.request(task);
// Receive a `TaskResult` if any are pending
if let Ok(result) = worker.result_receiver().try_recv() {
// Process the `TaskResult`
}
}
Em alguns casos, você pode precisar implementar Send
para Task
e / ou TaskResult
. Confira "Compreendendo o traço Send" .
unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
O parâmetro de tipo de a JoinHandle
deve ser o tipo de retorno da função do segmento.
Nesse caso, o tipo de retorno é uma tupla vazia ()
, unidade pronunciada . É usado quando há apenas um valor possível e é o "tipo de retorno" implícito de funções quando nenhum tipo de retorno é especificado.
Você pode apenas escrever JoinHandle<()>
para representar que a função não retornará nada.
(Observação: seu código terá alguns problemas com o verificador de empréstimo self.call()
, que provavelmente precisarão ser resolvidos Arc<Mutex<Self>>
, mas essa é outra questão.)