Gerar futuro não estático com Tokio 0.2

Dec 13 2020

Eu tenho um método assíncrono que deve executar alguns futuros em paralelo e só retornar depois que todos os futuros terminarem. No entanto, são passados ​​alguns dados por referência que não duram tanto quanto 'static(eles serão descartados em algum ponto do método principal). Conceitualmente, é semelhante a isto ( Playground ):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

Agora, tokio quer que os futuros passados spawnsejam válidos para o 'staticresto da vida, porque eu poderia largar a manivela sem que o futuro parasse. Isso significa que meu exemplo acima produz esta mensagem de erro:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

Portanto, minha pergunta é: Como faço para gerar futuros que são válidos apenas para o contexto atual para que eu possa esperar até que todos eles sejam concluídos?

(se isso for possível no tokio 0.3, mas não no 0.2, ainda estou interessado, embora isso envolva muitas dependências git por enquanto)

Respostas

3 AliceRyhl Dec 14 2020 at 17:35

Não é possível gerar um não 'staticfuturo a partir do Rust assíncrono. Isso ocorre porque qualquer função assíncrona pode ser cancelada a qualquer momento, portanto, não há como garantir que o chamador realmente sobreviva às tarefas geradas.

É verdade que existem vários engradados que permitem spawns com escopo de tarefas assíncronas, mas esses engradados não podem ser usados ​​em código assíncrono. O que eles não permitem que é para desovar escopo assíncrona tarefas de não-async código. Isso não viola o problema acima, porque o código não assíncrono que os gerou não pode ser cancelado a qualquer momento, pois não é assíncrono.

Geralmente, existem duas abordagens para isso:

  1. Crie uma 'statictarefa usando em Arcvez de referências comuns.
  2. Use as primitivas de simultaneidade da caixa de futuros em vez de desovar.

Observe que esta resposta se aplica a Tokio 0.2.xe 0.3.x.


Geralmente, para gerar uma tarefa estática e usar Arc, você deve ter propriedade dos valores em questão. Isso significa que, como sua função tomou o argumento por referência, você não pode usar essa técnica sem clonar os dados.

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

Observe que, se você tiver uma referência mutável aos dados, e os dados não forem Sized, ou seja, uma fatia, é possível apropriar-se dela temporariamente.

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

Outra opção é usar as primitivas de simultaneidade da caixa de futuros. Eles têm a vantagem de trabalhar com não 'staticdados, mas a desvantagem de que as tarefas não poderão ser executadas em vários threads ao mesmo tempo.

Para muitos fluxos de trabalho, isso é perfeitamente normal, já que o código assíncrono deve passar a maior parte do tempo esperando pelo IO de qualquer maneira.

Uma abordagem é usar FuturesUnordered. Esta é uma coleção especial que pode armazenar muitos futuros diferentes e tem uma nextfunção que executa todos eles simultaneamente e retorna assim que o primeiro terminar. (A nextfunção só está disponível quando StreamExté importado)

Você pode usá-lo assim:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

Nota: O FuturesUnordereddeve ser definido após o valor compartilhado. Caso contrário, você obterá um erro de empréstimo causado por eles terem sido descartados na ordem errada.


Outra abordagem é usar um Stream. Com streams, você pode usar buffer_unordered. Este é um utilitário que usa FuturesUnorderedinternamente.

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

Observe que, em ambos os casos, a importação StreamExté importante, pois fornece vários métodos que não estão disponíveis em fluxos sem importar o traço de extensão.