Come posso generare metodi asincroni in un ciclo?

Aug 16 2020

Ho un vettore di oggetti che hanno un resolve()metodo che utilizza reqwestper interrogare un'API web esterna. Dopo aver chiamato il resolve()metodo su ogni oggetto, voglio stampare il risultato di ogni richiesta.

Ecco il mio codice semiasincrono che si compila e funziona (ma non proprio in modo asincrono):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

Ho provato a utilizzare tokio::join!per generare tutte le chiamate asincrone e attendere che finiscano, ma probabilmente sto facendo qualcosa di sbagliato:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

Ecco l'errore che ricevo:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

Come posso chiamare i resolve()metodi per tutte le istanze contemporaneamente?


Questo codice riflette la risposta - ora ho a che fare con errori di verifica del prestito che non capisco veramente - devo annotare alcune delle mie variabili con 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

Risposte

2 user4815162342 Aug 16 2020 at 20:29

Dal momento che vuoi aspettare sul futuro in parallelo, puoi generarli in singole attività che vengono eseguite in parallelo. Dal momento che corrono indipendentemente l'uno dall'altro e dal thread che li ha generati, puoi attendere le loro maniglie in qualsiasi ordine.

Idealmente dovresti scrivere qualcosa del genere:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

Ma questo verrà rifiutato dal controllore del prestito perché il futuro restituito da item.resolve()contiene un riferimento preso in prestito a item. Il riferimento viene passato a tokio::spawn()chi lo passa a un altro thread e il compilatore non può dimostrare che itemsopravviverà a quel thread. (Lo stesso tipo di problema si verifica quando si desidera inviare un riferimento a dati locali a un thread .)

Ci sono diverse possibili soluzioni a questo; quello che trovo più elegante è spostare gli elementi nella chiusura asincrona a cui sono passati tokio::spawn()e chiedere all'attività di restituirli una volta terminato. Fondamentalmente si consuma il itemsvettore per creare le attività e immediatamente ricostituirlo dai risultati attesi:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

Codice eseguibile nel playground .

Nota che la futurescassa contiene una join_allfunzione simile a quella di cui hai bisogno, tranne che esegue il polling dei singoli futures senza assicurarsi che funzionino in parallelo. Possiamo scrivere un generico join_parallelche utilizza join_all, ma utilizza anche tokio::spawnper ottenere l'esecuzione parallela:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

Usando questa funzione il codice necessario per rispondere alla domanda si riduce a:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

Di nuovo, codice eseguibile nel playground .