Làm cách nào để tạo ra các phương thức không đồng bộ trong một vòng lặp?
Tôi có một vectơ đối tượng có resolve()
phương thức sử dụng reqwest
để truy vấn API web bên ngoài. Sau khi tôi gọi resolve()
phương thức trên mỗi đối tượng, tôi muốn in kết quả của mọi yêu cầu.
Đây là mã nửa không đồng bộ của tôi biên dịch và hoạt động (nhưng không thực sự không đồng bộ):
for mut item in items {
item.resolve().await;
item.print_result();
}
Tôi đã cố gắng sử dụng tokio::join!
để tạo ra tất cả các cuộc gọi không đồng bộ và đợi chúng kết thúc, nhưng có lẽ tôi đang làm sai:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Đây là lỗi tôi gặp phải:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Làm cách nào để gọi các resolve()
phương thức cho tất cả các trường hợp cùng một lúc?
Mã này phản ánh câu trả lời - bây giờ tôi đang xử lý các lỗi của trình kiểm tra mượn mà tôi không thực sự hiểu - tôi có nên chú thích một số biến của mình với 'static
không?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
Trả lời
Vì bạn muốn chờ đợi tương lai song song, bạn có thể tạo chúng thành các nhiệm vụ riêng lẻ chạy song song. Vì chúng chạy độc lập với nhau và của chuỗi sinh ra chúng, bạn có thể chờ đợi các chốt của chúng theo bất kỳ thứ tự nào.
Tốt nhất bạn nên viết một cái gì đó như thế này:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
Nhưng điều này sẽ bị từ chối bởi người kiểm tra đi vay vì tương lai được trả lại bằng cách item.resolve()
giữ một tham chiếu đã mượn item
. Tham chiếu được chuyển cho tokio::spawn()
luồng đó chuyển sang luồng khác và trình biên dịch không thể chứng minh rằng item
sẽ tồn tại lâu hơn luồng đó. (Vấn đề tương tự cũng gặp phải khi bạn muốn gửi tham chiếu đến dữ liệu cục bộ đến một chuỗi .)
Có một số giải pháp khả thi cho việc này; cách mà tôi thấy thanh lịch nhất là chuyển các mục vào vùng đóng không đồng bộ được chuyển tới tokio::spawn()
và giao nhiệm vụ lại cho bạn sau khi hoàn thành. Về cơ bản, bạn sử dụng items
vector để tạo các nhiệm vụ và ngay lập tức hoàn thiện nó từ các kết quả đã chờ đợi:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
Mã chạy được trong sân chơi .
Lưu ý rằng futures
thùng chứa một join_allchức năng tương tự như những gì bạn cần, ngoại trừ nó thăm dò các tương lai riêng lẻ mà không đảm bảo rằng chúng chạy song song. Chúng tôi có thể viết một chung join_parallel
sử dụng join_all
, nhưng cũng sử dụng tokio::spawn
để thực hiện song song:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
Sử dụng chức năng này, mã cần thiết để trả lời câu hỏi sẽ chỉ đơn giản là:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
Một lần nữa, mã chạy được trong sân chơi .