Como posso gerar métodos assíncronos em um loop?
Eu tenho um vetor de objetos que tem um resolve()
método que usa reqwest
para consultar uma API da web externa. Depois de chamar o resolve()
método em cada objeto, desejo imprimir o resultado de cada solicitação.
Aqui está meu código meio assíncrono que compila e funciona (mas não realmente de forma assíncrona):
for mut item in items {
item.resolve().await;
item.print_result();
}
Tentei tokio::join!
gerar todas as chamadas assíncronas e esperar que terminassem, mas provavelmente estou fazendo algo errado:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Aqui está o erro que estou recebendo:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Como posso chamar os resolve()
métodos para todas as instâncias de uma vez?
Este código reflete a resposta - agora estou lidando com erros do verificador de empréstimo que realmente não entendo - devo anotar algumas das minhas variáveis 'static
?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
Respostas
Já que você deseja esperar pelo futuro em paralelo, você pode gerá- los em tarefas individuais que são executadas em paralelo. Como eles são executados independentemente uns dos outros e do thread que os gerou, você pode aguardar seus identificadores em qualquer ordem.
Idealmente, você escreveria algo assim:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
Mas isso será rejeitado pelo verificador de empréstimo porque o futuro retornado por item.resolve()
contém uma referência emprestada a item
. A referência é passada para tokio::spawn()
outra thread, e o compilador não pode provar que item
sobreviverá a essa thread. (O mesmo tipo de problema é encontrado quando você deseja enviar referência a dados locais para um thread .)
Existem várias soluções possíveis para isso; o que considero mais elegante é mover os itens para o fechamento assíncrono para o qual foi passado tokio::spawn()
e ter a tarefa devolvê-los a você quando terminar. Basicamente, você consome o items
vetor para criar as tarefas e o reconstitui imediatamente a partir dos resultados esperados:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
Código executável no playground .
Observe que a futures
caixa contém uma join_allfunção semelhante ao que você precisa, exceto que pesquisa os futuros individuais sem garantir que eles funcionem em paralelo. Podemos escrever um genérico join_parallel
que usa join_all
, mas também usa tokio::spawn
para obter execução paralela:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
Usando esta função, o código necessário para responder à pergunta se resume a apenas:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
Novamente, código executável no playground .