Menelurkan masa depan non-statis dengan Tokio 0.2
Saya memiliki metode async yang harus mengeksekusi beberapa futures secara paralel, dan hanya kembali setelah semua futures selesai. Namun, itu melewati beberapa data dengan referensi yang tidak hidup selama 'static
(itu akan dihapus di beberapa titik dalam metode utama). Secara konseptual, ini mirip dengan ini ( Playground ):
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
Sekarang, tokio ingin masa depan yang dilewati spawn
berlaku 'static
seumur hidup, karena saya bisa melepaskan pegangannya tanpa henti di masa depan. Itu berarti contoh saya di atas menghasilkan pesan kesalahan ini:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
| ^^^^^ ------ this data with an anonymous lifetime `'_`...
| |
| ...is captured here...
...
15 | let task = spawn(do_sth(i));
| ----- ...and is required to live as long as `'static` here
Jadi pertanyaan saya adalah: Bagaimana cara menelurkan kontrak berjangka yang hanya berlaku untuk konteks saat ini sehingga saya bisa menunggu sampai semuanya selesai?
(jika ini memungkinkan di tokio 0.3 tetapi tidak 0.2 saya masih tertarik, meskipun itu akan melibatkan banyak dependensi git untuk saat ini)
Jawaban
Tidak mungkin untuk menelurkan non- 'static
future dari async Rust. Ini karena fungsi asinkron apa pun dapat dibatalkan kapan saja, jadi tidak ada cara untuk menjamin bahwa pemanggil benar-benar hidup lebih lama dari tugas yang dimunculkan.
Memang benar bahwa ada berbagai peti yang memungkinkan munculnya cakupan tugas asinkron, tetapi peti ini tidak dapat digunakan dari kode asinkron. Apa yang mereka lakukan memungkinkan adalah untuk bertelur scoped async tugas dari non-async kode. Ini tidak melanggar masalah di atas, karena kode non-asinkron yang memunculkannya tidak dapat dibatalkan kapan pun, karena bukan asinkron.
Umumnya ada dua pendekatan untuk ini:
- Buat
'static
tugas dengan menggunakanArc
referensi biasa. - Gunakan primitif konkurensi dari peti berjangka alih-alih pemijahan.
Perhatikan bahwa jawaban ini berlaku untuk Tokio 0.2.x
dan 0.3.x
.
Umumnya untuk menelurkan tugas dan penggunaan statis Arc
, Anda harus memiliki kepemilikan nilai yang dipermasalahkan. Ini berarti bahwa karena fungsi Anda mengambil argumen sebagai referensi, Anda tidak dapat menggunakan teknik ini tanpa menggandakan data.
async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
Perhatikan bahwa jika Anda memiliki referensi yang bisa berubah ke data, dan datanya Sized
, yaitu bukan potongan, Anda dapat mengambil kepemilikannya untuk sementara.
async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}
Pilihan lain adalah menggunakan primitif konkurensi dari peti berjangka. Ini memiliki keuntungan bekerja dengan non- 'static
data, tetapi kerugiannya bahwa tugas tidak akan dapat dijalankan di beberapa utas pada waktu yang sama.
Untuk banyak alur kerja, ini baik-baik saja, karena kode asinkron harus menghabiskan sebagian besar waktunya menunggu IO.
Salah satu pendekatannya adalah dengan menggunakan FuturesUnordered. Ini adalah koleksi khusus yang dapat menyimpan banyak masa depan yang berbeda, dan memiliki next
fungsi yang menjalankan semuanya secara bersamaan, dan kembali setelah yang pertama selesai. ( next
Fungsi ini hanya tersedia saat StreamExt
diimpor)
Anda bisa menggunakannya seperti ini:
use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}
Catatan: The FuturesUnordered
harus didefinisikan setelah nilai bersama. Jika tidak, Anda akan mendapatkan kesalahan peminjaman yang disebabkan oleh kesalahan pesanan yang dijatuhkan.
Pendekatan lain adalah dengan menggunakan a Stream
. Dengan aliran, Anda dapat menggunakan buffer_unordered. Ini adalah utilitas yang menggunakan secara FuturesUnordered
internal.
use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}
Perhatikan bahwa dalam kedua kasus tersebut, pengimporan StreamExt
penting karena menyediakan berbagai metode yang tidak tersedia di aliran tanpa mengimpor sifat ekstensi.