Çalışan iş parçacığı nasıl başlatılır ve durdurulur

Jan 03 2021

Diğer programlama dillerinde standart olan aşağıdaki bir gereksinimim var, ancak Rust'ta nasıl yapılacağını bilmiyorum.

Bir sınıfım var, 2 koşulu karşılayan bir işçi iş parçacığı oluşturmak için bir yöntem yazmak istiyorum:

  • Çalışan iş parçacığı oluşturulduktan sonra işlev geri döner (bu nedenle diğer yerin beklemesi gerekmez)
  • Bu ipliği durdurmak için bir mekanizma var.

Örneğin, işte sahte kodum:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Adresinde bir hatam var thread: JoinHandle<?>. Bu durumda iplik türü nedir? Ve bir çalışan iş parçacığını başlatmak ve durdurmak için kodum doğru mu?

Yanıtlar

3 vallentin Jan 03 2021 at 02:07

Kısaca, Tin a join()on JoinHandle, geçilen kapanmanın sonucunu döndürür thread::spawn(). Senin durumunda böyle JoinHandle<?>olması gerekir JoinHandle<()>sizin kapanış döner olarak hiçbir şey yani ()(birim) .

Bunun dışında, sahte kodunuz birkaç ek sorun içerir.

  • Dönüş türü run()yanlış ve en azından olması gerekir Result<(), ()>.
  • threadAlan olması gerekir Option<JoinHandle<()>edebilmek için idare fn stop(&mut self) olarak join()tüketir JoinHandle.
  • Ancak, &mut selfkapanışa geçmeye çalışıyorsunuz , bu da çok daha fazla sorunu beraberinde getiriyor ve birden fazla değişken referansa indirgeniyor.
    • Bu, örneğin çözülebilir Mutex<A>. Ancak, ararsanız, stop()bunun yerine bir kilitlenmeye yol açabilir.

Ancak, sahte kod olduğundan ve yorumlarda açıkladınız. Birkaç örnekle ne demek istediğini açıklamama izin ver. Bu, sahte kodunuzu yeniden yazmamı içerir.

İşçi bittikten sonraki sonuç

Çalışan iş parçacığı çalışırken verilere erişmeniz gerekmiyorsa, yeni bir struct WorkerData. Sonra içinde run()ihtiyacınız olan verileri kopyalayın / klonlayın A(veya yeniden adlandırdığım gibi Worker). Sonra kapanışta nihayet datatekrar geri dönersiniz , böylece onu elde edebilirsiniz join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Gerçekten gerek yok threadolmaya Option<JoinHandle<WorkerData>>ve bunun yerine sadece kullanabilirsiniz JoinHandle<WorkerData>>. Çünkü run()tekrar aramak isteseydiniz , değişkeni Worker.

Şimdi biz kolaylaştırabilirsiniz Workerçıkarmadan, Optionve değiştirmek stoptüketmek threadoluşturarak birlikte yerine new() -> Selfyerine run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Paylaşılan WorkerData

Birden WorkerDataçok iş parçacığı arasındaki başvuruları saklamak istiyorsanız , kullanmanız gerekir Arc. Ek olarak onu değiştirebilmek istediğiniz için, bir Mutex.

Yalnızca tek bir iş parçacığı içinde mutasyona uğrayacaksanız RwLock, alternatif olarak , a ile karşılaştırıldığında Mutex, aynı anda birden fazla değişmez referansı kilitlemenize ve elde etmenize olanak tanıyan a'yı yapabilirsiniz.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Eğer bir yöntemi eklerseniz elde edebilmek için dataşeklinde Arc<RwLock<WorkerData>>. Ardından , aramadan önce klonlar Arcve kilitlerseniz (iç RwLock) stop(), bu bir kilitlenmeyle sonuçlanır. Bunu önlemek için herhangi bir data()yöntem dönmelidir &WorkerDataveya &mut WorkerDatayerine Arc. Bu şekilde stop()arayamaz ve bir çıkmaza neden olamazsınız.

İşçiyi durdurmak için işaretle

Çalışan iş parçacığını gerçekten durdurmak istiyorsanız, bunu yapmasını işaret etmek için bir bayrak kullanmanız gerekir. Paylaşılan şeklinde bir bayrak oluşturabilirsiniz AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Birden çok iş parçacığı ve birden çok görev

Birden çok türde görevin işlenmesini, birden çok iş parçacığına yayılmasını istiyorsanız, işte size daha genel bir örnek.

Kullanmaktan zaten bahsetmiştin mpsc. Böylece bir özel ve enum ile birlikte Senderve kullanabilirsiniz .ReceiverTaskTaskResult

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

WorkerGörev gönderme ve görev sonuçlarını alma ile birlikte kullanımına bir örnek daha sonra şöyle görünecektir:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

Bazı durumlarda uygulamak gerekebilir Sendiçin Taskve / veya TaskResult. Check out "Gönder özelliğini anlama" .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

A'nın tür parametresi JoinHandle, evre işlevinin dönüş türü olmalıdır.

Bu durumda, dönüş türü boş bir tuple (), telaffuz edilen birimdir . Yalnızca bir değer mümkün olduğunda kullanılır ve hiçbir dönüş türü belirtilmediğinde işlevlerin örtük "dönüş türü" dir.

JoinHandle<()>İşlevin hiçbir şey döndürmeyeceğini belirtmek için yazabilirsiniz .

(Not: Kodunuz, self.call()muhtemelen çözülmesi gereken bazı ödünç denetleyicisi sorunlarıyla karşılaşacaktır Arc<Mutex<Self>>, ancak bu başka bir soru.)