Genera un futuro no estático con Tokio 0.2

Dec 13 2020

Tengo un método asincrónico que debería ejecutar algunos futuros en paralelo y solo regresar después de que todos los futuros terminen. Sin embargo, se pasan algunos datos por referencia que no viven tanto tiempo como 'static(se eliminarán en algún momento del método principal). Conceptualmente, es similar a esto ( Zona de juegos ):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

Ahora, tokio quiere que los futuros que se pasan spawnsean válidos de por 'staticvida, porque podría dejar el control sin que el futuro se detenga. Eso significa que mi ejemplo anterior produce este mensaje de error:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

Entonces, mi pregunta es: ¿Cómo genero futuros que solo son válidos para el contexto actual y luego puedo esperar hasta que se completen todos?

(si esto es posible en tokio 0.3 pero no en 0.2, todavía estoy interesado, aunque eso implicaría muchas dependencias de git por el momento)

Respuestas

3 AliceRyhl Dec 14 2020 at 17:35

No es posible generar un no 'staticfuturo a partir de async Rust. Esto se debe a que cualquier función asíncrona puede cancelarse en cualquier momento, por lo que no hay forma de garantizar que la persona que llama realmente sobreviva a las tareas generadas.

Es cierto que hay varias cajas que permiten la generación de tareas asíncronas con alcance, pero estas cajas no se pueden usar desde código asíncrono. Lo que permiten es generar tareas asíncronas de ámbito a partir de código no asíncrono . Esto no viola el problema anterior, porque el código no asíncrono que los generó no se puede cancelar en ningún momento, ya que no es asíncrono.

Generalmente hay dos enfoques para esto:

  1. Genere una 'statictarea utilizando Arcreferencias en lugar de ordinarias.
  2. Utilice las primitivas de simultaneidad de la caja de futuros en lugar de generar.

Tenga en cuenta que esta respuesta se aplica tanto a Tokio 0.2.xcomo a 0.3.x.


Generalmente, para generar una tarea estática y su uso Arc, debe tener la propiedad de los valores en cuestión. Esto significa que, dado que su función tomó el argumento por referencia, no puede usar esta técnica sin clonar los datos.

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

Tenga en cuenta que si tiene una referencia mutable a los datos, y los datos son Sized, es decir, no un segmento, es posible tomar posesión de ellos temporalmente.

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

Otra opción es utilizar las primitivas de simultaneidad de la caja de futuros. Estos tienen la ventaja de trabajar con no 'staticdatos, pero la desventaja de que las tareas no podrán ejecutarse en varios subprocesos al mismo tiempo.

Para muchos flujos de trabajo, esto está perfectamente bien, ya que el código asincrónico debería pasar la mayor parte del tiempo esperando IO de todos modos.

Un enfoque es utilizar FuturesUnordered. Esta es una colección especial que puede almacenar muchos futuros diferentes, y tiene una nextfunción que los ejecuta todos al mismo tiempo, y regresa una vez que finaliza el primero de ellos. (La nextfunción solo está disponible cuando StreamExtse importa)

Puedes usarlo así:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

Nota: El FuturesUnordereddebe ser definido después del valor compartido. De lo contrario, obtendrá un error de préstamo debido a que se colocaron en el orden incorrecto.


Otro enfoque es utilizar un Stream. Con las transmisiones, puede usar buffer_unordered. Esta es una utilidad que usa FuturesUnorderedinternamente.

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

Tenga en cuenta que en ambos casos, la importación StreamExtes importante ya que proporciona varios métodos que no están disponibles en las transmisiones sin importar el rasgo de extensión.