Cách bắt đầu và dừng một chuỗi công nhân

Jan 03 2021

Tôi có một yêu cầu sau đây là yêu cầu tiêu chuẩn trong các ngôn ngữ lập trình khác nhưng tôi không biết cách thực hiện trong Rust.

Tôi có một lớp, tôi muốn viết một phương thức để tạo ra một chuỗi công nhân thỏa mãn 2 điều kiện:

  • Sau khi tạo chuỗi công nhân, hàm sẽ được trả về (vì vậy nơi khác không cần đợi)
  • Có một cơ chế để dừng luồng này.

Ví dụ, đây là mã giả của tôi:

struct A {
    thread: JoinHandle<?>,
}

impl A {
    pub fn run(&mut self) -> Result<()>{
        self.thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                self.call();
                i = 1 + i;
                if i > 5 {
                    return
                }
            }
        });
        Ok(())
    }

    pub fn stop(&mut self) -> std::thread::Result<_> {
        self.thread.join()
    }

    pub fn call(&mut self) {
        println!("hello world");
    }
}

fn main() {
    let mut a = A{};
    a.run();
}

Tôi có một lỗi tại thread: JoinHandle<?>. Loại chủ đề trong trường hợp này là gì. Và mã của tôi có chính xác để bắt đầu và dừng một luồng công nhân không?

Trả lời

3 vallentin Jan 03 2021 at 02:07

Nói tóm lại, Tin join()on a JoinHandletrả về kết quả của quá trình đóng được chuyển đến thread::spawn(). Vì vậy, trong trường hợp của bạn JoinHandle<?>sẽ cần phải là JoinHandle<()>như đóng cửa của bạn không trả lại , tức là ()(đơn vị) .

Ngoài ra, mã giả của bạn chứa một số vấn đề bổ sung.

  • Loại trả về run()là không chính xác và ít nhất sẽ phải như vậy Result<(), ()>.
  • Các threadlĩnh vực sẽ cần phải có Option<JoinHandle<()>để có thể xử lý fn stop(&mut self) như join()tiêu thụ các JoinHandle.
  • Tuy nhiên, bạn đang cố gắng chuyển &mut selfsang phần đóng, điều này mang lại nhiều vấn đề hơn, dẫn đến nhiều tham chiếu có thể thay đổi
    • Điều này có thể được giải quyết với ví dụ Mutex<A>. Tuy nhiên, nếu bạn gọi stop()thì điều đó có thể dẫn đến bế tắc.

Tuy nhiên, vì nó là mã giả, và bạn đã làm rõ trong phần bình luận. Hãy để tôi thử và làm rõ ý của bạn với một vài ví dụ. Điều này bao gồm việc tôi viết lại mã giả của bạn.

Kết quả sau khi công nhân làm xong

Nếu bạn không cần quyền truy cập vào dữ liệu trong khi chuỗi công nhân đang chạy, thì bạn có thể tạo mới struct WorkerData. Sau đó, run()bạn sao chép / sao chép dữ liệu bạn cần A(hoặc như tôi đã đổi tên nó Worker). Sau đó, cuối cùng bạn quay trở lại datamột lần nữa, vì vậy bạn có thể tiếp thu nó thông qua join().

use std::thread::{self, JoinHandle};

struct WorkerData {
    ...
}

impl WorkerData {
    pub fn call(&mut self) {
        println!("hello world");
    }
}

struct Worker {
    thread: Option<JoinHandle<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        Self { thread: None }
    }

    pub fn run(&mut self) {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        self.thread = Some(thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    // Return `data` so we get in through `join()`
                    return data;
                }
            }
        }));
    }

    pub fn stop(&mut self) -> Option<thread::Result<WorkerData>> {
        if let Some(handle) = self.thread.take() {
            Some(handle.join())
        } else {
            None
        }
    }
}

Bạn không thực sự cần threadphải có Option<JoinHandle<WorkerData>>và thay vào đó chỉ cần sử dụng JoinHandle<WorkerData>>. Bởi vì nếu bạn muốn gọi run()lại, sẽ dễ dàng hơn khi gán lại biến đang giữ Worker.

Vì vậy, bây giờ chúng ta có thể đơn giản hóa Worker, loại bỏ Optionvà thay đổi stopđể tiêu thụ threadthay vào đó, cùng với việc tạo new() -> Selftại chỗ run(&mut self).

use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<WorkerData>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let mut data = WorkerData {};

        let thread = thread::spawn(move || {
            let mut i = 0;
            loop {
                data.call();
                i = 1 + i;
                if i > 5 {
                    return data;
                }
            }
        });

        Self { thread }
    }

    pub fn stop(self) -> thread::Result<WorkerData> {
        self.thread.join()
    }
}

Được chia sẻ WorkerData

Nếu bạn muốn giữ lại các tham chiếu đến WorkerDatagiữa nhiều chuỗi, thì bạn cần phải sử dụng Arc. Vì bạn cũng muốn có thể thay đổi nó, bạn sẽ cần phải sử dụng một Mutex.

Nếu bạn chỉ thay đổi trong một chuỗi duy nhất, thì bạn có thể thay thế bạn a RwLock, so với a Mutexsẽ cho phép bạn khóa và nhận được nhiều tham chiếu bất biến cùng một lúc.

use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let thread = thread::spawn({
            let data = data.clone();
            move || {
                let mut i = 0;
                loop {
                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    i = 1 + i;
                    if i > 5 {
                        return;
                    }
                }
            }
        });

        Self { thread, data }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Nếu bạn thêm một phương thức để có thể lấy dataở dạng Arc<RwLock<WorkerData>>. Sau đó, nếu bạn sao chép Arcvà khóa nó (bên trong RwLock) trước khi gọi stop(), thì điều đó sẽ dẫn đến bế tắc. Để tránh điều đó, bất kỳ data()phương thức nào cũng nên trả về &WorkerDatahoặc &mut WorkerDatathay vì Arc. Bằng cách đó, bạn sẽ không thể gọi stop()và gây ra bế tắc.

Cờ để dừng công nhân

Nếu bạn thực sự muốn dừng chuỗi công nhân, thì bạn phải sử dụng cờ để báo hiệu nó làm như vậy. Bạn có thể tạo cờ dưới dạng chia sẻ AtomicBool.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};

struct Worker {
    thread: JoinHandle<()>,
    data: Arc<RwLock<WorkerData>>,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new() -> Self {
        // Create `WorkerData` and copy/clone whatever is needed from `self`
        let data = Arc::new(RwLock::new(WorkerData {}));

        let stop_flag = Arc::new(AtomicBool::new(false));

        let thread = thread::spawn({
            let data = data.clone();
            let stop_flag = stop_flag.clone();
            move || {
                // let mut i = 0;
                loop {
                    if stop_flag.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Ok(mut data) = data.write() {
                        data.call();
                    }

                    // i = 1 + i;
                    // if i > 5 {
                    //     return;
                    // }
                }
            }
        });

        Self {
            thread,
            data,
            stop_flag,
        }
    }

    pub fn stop(self) -> thread::Result<Arc<RwLock<WorkerData>>> {
        self.stop_flag.store(true, Ordering::Relaxed);
        self.thread.join()?;
        // You might be able to unwrap and get the inner `WorkerData` here
        Ok(self.data)
    }
}

Nhiều chủ đề và nhiều nhiệm vụ

Nếu bạn muốn nhiều loại tác vụ được xử lý, trải dài trên nhiều chuỗi, thì đây là một ví dụ tổng quát hơn.

Bạn đã đề cập đến việc sử dụng mpsc. Vì vậy, bạn có thể sử dụng a Sendervà Receivercùng với một tùy chỉnh TaskTaskResultenum.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

pub enum Task {
    ...
}

pub enum TaskResult {
    ...
}

pub type TaskSender = Sender<Task>;
pub type TaskReceiver = Receiver<Task>;

pub type ResultSender = Sender<TaskResult>;
pub type ResultReceiver = Receiver<TaskResult>;

struct Worker {
    threads: Vec<JoinHandle<()>>,
    task_sender: TaskSender,
    result_receiver: ResultReceiver,
    stop_flag: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(num_threads: usize) -> Self {
        let (task_sender, task_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();

        let task_receiver = Arc::new(Mutex::new(task_receiver));

        let stop_flag = Arc::new(AtomicBool::new(false));

        Self {
            threads: (0..num_threads)
                .map(|_| {
                    let task_receiver = task_receiver.clone();
                    let result_sender = result_sender.clone();
                    let stop_flag = stop_flag.clone();

                    thread::spawn(move || loop {
                        if stop_flag.load(Ordering::Relaxed) {
                            break;
                        }

                        let task_receiver = task_receiver.lock().unwrap();

                        if let Ok(task) = task_receiver.recv() {
                            drop(task_receiver);

                            // Perform the `task` here

                            // If the `Task` results in a `TaskResult` then create it and send it back
                            let result: TaskResult = ...;
                            // The `SendError` can be ignored as it only occurs if the receiver
                            // has already been deallocated
                            let _ = result_sender.send(result);
                        } else {
                            break;
                        }
                    })
                })
                .collect(),
            task_sender,
            result_receiver,
            stop_flag,
        }
    }

    pub fn stop(self) -> Vec<thread::Result<()>> {
        drop(self.task_sender);

        self.stop_flag.store(true, Ordering::Relaxed);

        self.threads
            .into_iter()
            .map(|t| t.join())
            .collect::<Vec<_>>()
    }

    #[inline]
    pub fn request(&mut self, task: Task) {
        self.task_sender.send(task).unwrap();
    }

    #[inline]
    pub fn result_receiver(&mut self) -> &ResultReceiver {
        &self.result_receiver
    }
}

Một ví dụ về việc sử dụng Workercùng với việc gửi tác vụ và nhận kết quả tác vụ, sau đó sẽ trông như thế này:

fn main() {
    let mut worker = Worker::new(4);

    // Request that a `Task` is performed
    worker.request(task);

    // Receive a `TaskResult` if any are pending
    if let Ok(result) = worker.result_receiver().try_recv() {
        // Process the `TaskResult`
    }
}

Trong một số trường hợp, bạn có thể cần phải triển khai Sendcho Taskvà / hoặc TaskResult. Kiểm tra "Hiểu đặc điểm Gửi" .

unsafe impl Send for Task {}
unsafe impl Send for TaskResult {}
1 ddulaney Jan 03 2021 at 00:45

Tham số kiểu của a JoinHandlephải là kiểu trả về của hàm của luồng.

Trong trường hợp này, kiểu trả về là một bộ giá trị trống (), đơn vị được phát âm . Nó được sử dụng khi chỉ có một giá trị duy nhất có thể và là "kiểu trả về" ngầm định của các hàm khi không có kiểu trả về nào được chỉ định.

Bạn chỉ có thể viết JoinHandle<()>để biểu thị rằng hàm sẽ không trả về bất cứ thứ gì.

(Lưu ý: Mã của bạn sẽ gặp phải một số vấn đề với trình kiểm tra khoản vay self.call(), có thể cần phải giải quyết Arc<Mutex<Self>>, nhưng đó là một câu hỏi khác.)