Tokio0.2で非静的な未来を生み出す

Dec 13 2020

いくつかのfutureを並行して実行し、すべてのfutureが終了した後にのみ戻る必要があるasyncメソッドがあります。ただし、参照によって、存続しないデータが渡されます'static(mainメソッドのある時点でドロップされます)。概念的には、これに似ています(プレイグラウンド):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

さて、トキオは、先物を止めずにハンドルを落とすことができるのでspawn、渡された先物が'static生涯有効であることを望んでいます。これは、上記の例で次のエラーメッセージが表示されることを意味します。

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

だから私の質問は:現在のコンテキストでのみ有効な先物を生成し、それらがすべて完了するまで待つことができるようにするにはどうすればよいですか?

(これがtokio 0.3で可能であるが、0.2では不可能である場合、当面は多くのgit依存関係が含まれますが、私はまだ興味があります)

回答

3 AliceRyhl Dec 14 2020 at 17:35

'staticasyncRustからnon-futureをスポーンすることはできません。これは、非同期関数がいつでもキャンセルされる可能性があるため、呼び出し元が生成されたタスクよりも実際に長生きすることを保証する方法がないためです。

非同期タスクのスコープ付きスポーンを許可するさまざまなクレートがあることは事実ですが、これらのクレートは非同期コードからは使用できません。彼ら許可するのは、非非同期コードからスコープ付き非同期タスクを生成することです。それらを生成した非同期コードは非同期ではないため、いつでもキャンセルできないため、これは上記の問題に違反しません。

一般に、これには2つのアプローチがあります。

  1. 通常の参照ではなく、'staticを使用してタスクを生成しArcます。
  2. スポーンする代わりに、先物クレートからの並行性プリミティブを使用します。

この回答は東京0.2.xとの両方に当てはまることに注意してください0.3.x


通常、静的タスクを生成して使用するArcには、問題の値の所有権が必要です。これは、関数が参照によって引数をとったため、データのクローンを作成せずにこの手法を使用できないことを意味します。

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

データへの変更可能な参照があり、データがSizedスライスではない場合、一時的にデータの所有権を取得できることに注意してください。

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

別のオプションは、先物クレートからの並行性プリミティブを使用することです。これらには、非'staticデータを処理するという利点がありますが、タスクを複数のスレッドで同時に実行できないという欠点があります。

とにかく非同期コードはほとんどの時間をIOの待機に費やす必要があるため、多くのワークフローではこれはまったく問題ありません。

1つのアプローチはを使用することFuturesUnorderedです。これは、多くの異なる先物を保存できる特別なコレクションでnextあり、それらすべてを同時に実行し、最初の先物が終了すると戻る機能を備えています。(このnext機能は、StreamExtがインポートされた場合にのみ使用できます)

あなたはそれをこのように使うことができます:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

注:FuturesUnordered定義する必要があります後に共有値。そうしないと、間違った順序でドロップされることによって引き起こされる借用エラーが発生します。


別のアプローチは、を使用することStreamです。ストリームでは、を使用できますbuffer_unordered。これは、FuturesUnordered内部で使用するユーティリティです。

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

どちらの場合も、インポートStreamExtは、拡張トレイトをインポートしないとストリームで使用できないさまざまなメソッドを提供するため、重要であることに注意してください。