Comment puis-je générer des méthodes asynchrones dans une boucle?

Aug 16 2020

J'ai un vecteur d'objets qui ont une resolve()méthode qui utilise reqwestpour interroger une API Web externe. Après avoir appelé la resolve()méthode sur chaque objet, je souhaite imprimer le résultat de chaque requête.

Voici mon code semi-asynchrone qui se compile et fonctionne (mais pas vraiment de manière asynchrone):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

J'ai essayé d'utiliser tokio::join!pour générer tous les appels asynchrones et attendre qu'ils se terminent, mais je fais probablement quelque chose de mal:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

Voici l'erreur que j'obtiens:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

Comment puis-je appeler les resolve()méthodes de toutes les instances à la fois?


Ce code reflète la réponse - maintenant je suis confronté à des erreurs de vérification d'emprunt que je ne comprends pas vraiment - dois-je annoter certaines de mes variables 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

Réponses

2 user4815162342 Aug 16 2020 at 20:29

Puisque vous voulez attendre les futurs en parallèle, vous pouvez les engendrer dans des tâches individuelles qui s'exécutent en parallèle. Comme ils fonctionnent indépendamment les uns des autres et du thread qui les a engendrés, vous pouvez attendre leurs poignées dans n'importe quel ordre.

Idéalement, vous écrivez quelque chose comme ceci:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

Mais cela sera rejeté par le vérificateur d'emprunt car le futur retourné par item.resolve()contient une référence empruntée à item. La référence est passée à tokio::spawn()qui la transmet à un autre thread, et le compilateur ne peut pas prouver que itemcela survivra à ce thread. (Le même type de problème se produit lorsque vous souhaitez envoyer une référence à des données locales à un thread .)

Il existe plusieurs solutions possibles à cela; celui que je trouve le plus élégant est de déplacer les éléments vers la fermeture asynchrone tokio::spawn()et de vous les rendre une fois que c'est fait. En gros vous consommez le itemsvecteur pour créer les tâches et le reconstituer immédiatement à partir des résultats attendus:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

Code exécutable dans la cour de récréation .

Notez que la futurescaisse contient une join_allfonction similaire à ce dont vous avez besoin, sauf qu'elle interroge les futurs individuels sans s'assurer qu'ils fonctionnent en parallèle. Nous pouvons écrire un générique join_parallelqui utilise join_all, mais utilise également tokio::spawnpour obtenir une exécution parallèle:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

En utilisant cette fonction, le code nécessaire pour répondre à la question se résume à:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

Encore une fois, du code exécutable dans la cour de récréation .