非同期メソッドをループで生成するにはどうすればよいですか?
外部WebAPIのクエリにresolve()
使用するメソッドを持つオブジェクトのベクトルがありreqwest
ます。resolve()
各オブジェクトでメソッドを呼び出した後、すべてのリクエストの結果を出力したいと思います。
コンパイルして動作する(実際には非同期ではありませんが)半非同期コードは次のとおりです。
for mut item in items {
item.resolve().await;
item.print_result();
}
私はtokio::join!
すべての非同期呼び出しを生成し、それらが終了するのを待つために使用しようとしましたが、おそらく何か間違ったことをしています:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
これが私が得ているエラーです:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
resolve()
すべてのインスタンスのメソッドを一度に呼び出すにはどうすればよいですか?
このコードは答えを反映しています-今私は本当に理解していない借用チェッカーエラーを扱っています-変数のいくつかに注釈を付ける必要があり'static
ますか?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
回答
先物を並行して待ちたいので、並行して実行される個々のタスクに先物をスポーンすることができます。それらは互いに独立して実行され、それらを生成したスレッドからも独立して実行されるため、任意の順序でハンドルを待つことができます。
理想的には、次のように記述します。
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
ただし、によって返される未来はitem.resolve()
への借用参照を保持しているため、これは借用チェッカーによって拒否されitem
ます。参照が渡されtokio::spawn()
て別のスレッドに渡され、コンパイラーはそれitem
がそのスレッドよりも長生きすることを証明できません。(ローカルデータへの参照をスレッドに送信する場合にも、同じ種類の問題が発生します。)
これにはいくつかの可能な解決策があります。私が最もエレガントだと思うのは、渡された非同期クロージャにアイテムを移動し、tokio::spawn()
完了したらタスクにそれらを返してもらうことです。基本的に、items
ベクトルを使用してタスクを作成し、待機中の結果からすぐに再構成します。
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
遊び場で実行可能なコード。
futures
クレートには、join_all並行して実行されることを保証せずに個々の先物をポーリングすることを除いて、必要なものと同様の関数が含まれていることに注意してください。をjoin_parallel
使用するジェネリックを記述できますが、並列実行を取得するためにjoin_all
も使用tokio::spawn
します。
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
この関数を使用すると、質問に答えるために必要なコードは次のようになります。
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
繰り返しますが、遊び場で実行可能なコード。