Come posso generare metodi asincroni in un ciclo?
Ho un vettore di oggetti che hanno un resolve()
metodo che utilizza reqwest
per interrogare un'API web esterna. Dopo aver chiamato il resolve()
metodo su ogni oggetto, voglio stampare il risultato di ogni richiesta.
Ecco il mio codice semiasincrono che si compila e funziona (ma non proprio in modo asincrono):
for mut item in items {
item.resolve().await;
item.print_result();
}
Ho provato a utilizzare tokio::join!
per generare tutte le chiamate asincrone e attendere che finiscano, ma probabilmente sto facendo qualcosa di sbagliato:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Ecco l'errore che ricevo:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Come posso chiamare i resolve()
metodi per tutte le istanze contemporaneamente?
Questo codice riflette la risposta - ora ho a che fare con errori di verifica del prestito che non capisco veramente - devo annotare alcune delle mie variabili con 'static
?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
Risposte
Dal momento che vuoi aspettare sul futuro in parallelo, puoi generarli in singole attività che vengono eseguite in parallelo. Dal momento che corrono indipendentemente l'uno dall'altro e dal thread che li ha generati, puoi attendere le loro maniglie in qualsiasi ordine.
Idealmente dovresti scrivere qualcosa del genere:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
Ma questo verrà rifiutato dal controllore del prestito perché il futuro restituito da item.resolve()
contiene un riferimento preso in prestito a item
. Il riferimento viene passato a tokio::spawn()
chi lo passa a un altro thread e il compilatore non può dimostrare che item
sopravviverà a quel thread. (Lo stesso tipo di problema si verifica quando si desidera inviare un riferimento a dati locali a un thread .)
Ci sono diverse possibili soluzioni a questo; quello che trovo più elegante è spostare gli elementi nella chiusura asincrona a cui sono passati tokio::spawn()
e chiedere all'attività di restituirli una volta terminato. Fondamentalmente si consuma il items
vettore per creare le attività e immediatamente ricostituirlo dai risultati attesi:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
Codice eseguibile nel playground .
Nota che la futures
cassa contiene una join_allfunzione simile a quella di cui hai bisogno, tranne che esegue il polling dei singoli futures senza assicurarsi che funzionino in parallelo. Possiamo scrivere un generico join_parallel
che utilizza join_all
, ma utilizza anche tokio::spawn
per ottenere l'esecuzione parallela:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
Usando questa funzione il codice necessario per rispondere alla domanda si riduce a:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
Di nuovo, codice eseguibile nel playground .