¿Cómo puedo generar métodos asincrónicos en un bucle?

Aug 16 2020

Tengo un vector de objetos que tiene un resolve()método que utiliza reqwestpara consultar una API web externa. Después de llamar al resolve()método en cada objeto, quiero imprimir el resultado de cada solicitud.

Aquí está mi código medio asíncrono que se compila y funciona (pero no realmente de forma asincrónica):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

Intenté usar tokio::join!para generar todas las llamadas asíncronas y esperar a que terminen, pero probablemente estoy haciendo algo mal:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

Aquí está el error que recibo:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

¿Cómo puedo llamar a los resolve()métodos para todas las instancias a la vez?


Este código refleja la respuesta, ahora estoy lidiando con errores del comprobador de préstamos que realmente no entiendo, ¿debería anotar algunas de mis variables 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

Respuestas

2 user4815162342 Aug 16 2020 at 20:29

Dado que desea esperar los futuros en paralelo, puede generarlos en tareas individuales que se ejecutan en paralelo. Dado que se ejecutan de forma independiente entre sí y del hilo que los generó, puede esperar sus identificadores en cualquier orden.

Idealmente escribirías algo como esto:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

Pero esto será rechazado por el verificador de préstamos porque el futuro devuelto por item.resolve()contiene una referencia prestada a item. La referencia se pasa a tokio::spawn()quien la pasa a otro hilo, y el compilador no puede probar que itemsobrevivirá a ese hilo. (El mismo tipo de problema se encuentra cuando desea enviar una referencia a datos locales a un hilo ).

Hay varias soluciones posibles para esto; el que encuentro más elegante es mover elementos al cierre asíncrono al que se ha pasado tokio::spawn()y hacer que la tarea se los devuelva una vez que esté hecho. Básicamente, consume el itemsvector para crear las tareas e inmediatamente lo reconstituye a partir de los resultados esperados:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

Código ejecutable en el patio de recreo .

Tenga en cuenta que la futurescaja contiene una join_allfunción que es similar a la que necesita, excepto que sondea los futuros individuales sin asegurarse de que se ejecuten en paralelo. Podemos escribir un genérico join_parallelque use join_all, pero también use tokio::spawnpara obtener ejecución en paralelo:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

Con esta función, el código necesario para responder la pregunta se reduce a:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

Nuevamente, código ejecutable en el patio de recreo .