Tokio 0.2 ile statik olmayan bir gelecek yaratın
Bazı vadeli işlemleri paralel olarak yürüten ve ancak tüm vadeli işlemler bittikten sonra geri dönen bir zaman uyumsuz yöntemim var. Ancak, o kadar uzun yaşamayan bazı veriler referans olarak aktarılır 'static
(ana yöntemde bir noktada bırakılacaktır). Kavramsal olarak şuna benzer ( Oyun Alanı ):
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
Şimdi, tokio devredilen vadeli işlemlerin ömür boyu spawn
geçerli olmasını istiyor 'static
, çünkü gelecek durmadan kolu bırakabilirim. Bu, yukarıdaki örneğimin bu hata mesajını oluşturduğu anlamına gelir:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
| ^^^^^ ------ this data with an anonymous lifetime `'_`...
| |
| ...is captured here...
...
15 | let task = spawn(do_sth(i));
| ----- ...and is required to live as long as `'static` here
Öyleyse sorum şu: Sadece mevcut bağlam için geçerli olan ve sonrasında hepsi tamamlanana kadar bekleyebileceğim futures'ları nasıl yaratırım?
(eğer bu tokio 0.3'te mümkünse 0.2 değil ise, şu an için çok fazla git bağımlılığı içermesine rağmen hala ilgileniyorum)
Yanıtlar
'static
Async Rust'tan non-Future oluşturmak mümkün değildir . Bunun nedeni, herhangi bir zaman uyumsuz işlevin herhangi bir zamanda iptal edilebilmesidir, bu nedenle, arayanın ortaya çıkan görevleri gerçekten geride bırakacağını garanti etmenin bir yolu yoktur.
Zaman uyumsuz görevlerin kapsamlı olarak ortaya çıkmasına izin veren çeşitli kasalar olduğu doğrudur, ancak bu kasalar eşzamansız koddan kullanılamaz. İzin verdikleri şey , eşzamansız olmayan koddan kapsamlı eşzamansız görevler oluşturmaktır . Bu, yukarıdaki sorunu ihlal etmez, çünkü onları oluşturan eşzamansız olmayan kod, eşzamansız olmadığı için hiçbir zaman iptal edilemez.
Genel olarak buna iki yaklaşım vardır:
- Sıradan referanslar yerine
'static
kullanarak bir görev oluşturunArc
. - Doğmak yerine vadeli işlemler sandığındaki eşzamanlılık ilkellerini kullanın.
Bu cevap Tokio için geçerlidir unutmayın 0.2.x
ve 0.3.x
.
Genellikle statik bir görev oluşturmak ve kullanmak için Arc
, söz konusu değerlerin sahipliğine sahip olmalısınız. Bu, işleviniz bağımsız değişkeni referans alarak aldığından, bu tekniği verileri klonlamadan kullanamayacağınız anlamına gelir.
async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
Verilere değişebilir bir referansınız varsa ve veriler Sized
bir dilim değilse, geçici olarak sahipliğini almanın mümkün olduğuna dikkat edin.
async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}
Diğer bir seçenek de vadeli işlemler sandığındaki eşzamanlılık ilkelerini kullanmaktır. Bunlar 'static
veri olmayanlarla çalışma avantajına sahiptir , ancak görevlerin aynı anda birden fazla iş parçacığı üzerinde çalışamayacak olması dezavantajına sahiptir.
Zaman uyumsuz kod, zamanının çoğunu yine de IO'yu bekleyerek geçirmesi gerektiğinden, çoğu iş akışı için bu tamamen iyidir.
Bir yaklaşım kullanmaktır FuturesUnordered. Bu, birçok farklı futures'ı saklayabilen özel bir koleksiyon ve next
hepsini aynı anda çalıştıran ve birincisi bittiğinde geri dönen bir işleve sahip. ( next
İşlev yalnızca StreamExt
içe aktarıldığında kullanılabilir)
Bunu şu şekilde kullanabilirsiniz:
use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}
Not:FuturesUnordered
tanımlanmalıdır sonra paylaşılan değer. Aksi takdirde, yanlış sırayla düşürülmesinden kaynaklanan bir ödünç alma hatası alırsınız.
Başka bir yaklaşım da a kullanmaktır Stream
. Akışlarla kullanabilirsiniz buffer_unordered. Bu, FuturesUnordered
dahili olarak kullanan bir yardımcı programdır .
use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}
Her iki durumda da, StreamExt
uzantı özelliğini içe aktarmadan akışlarda kullanılamayan çeşitli yöntemler sağladığından içe aktarmanın önemli olduğunu unutmayın.