Genera un futuro no estático con Tokio 0.2
Tengo un método asincrónico que debería ejecutar algunos futuros en paralelo y solo regresar después de que todos los futuros terminen. Sin embargo, se pasan algunos datos por referencia que no viven tanto tiempo como 'static
(se eliminarán en algún momento del método principal). Conceptualmente, es similar a esto ( Zona de juegos ):
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
Ahora, tokio quiere que los futuros que se pasan spawn
sean válidos de por 'static
vida, porque podría dejar el control sin que el futuro se detenga. Eso significa que mi ejemplo anterior produce este mensaje de error:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
| ^^^^^ ------ this data with an anonymous lifetime `'_`...
| |
| ...is captured here...
...
15 | let task = spawn(do_sth(i));
| ----- ...and is required to live as long as `'static` here
Entonces, mi pregunta es: ¿Cómo genero futuros que solo son válidos para el contexto actual y luego puedo esperar hasta que se completen todos?
(si esto es posible en tokio 0.3 pero no en 0.2, todavía estoy interesado, aunque eso implicaría muchas dependencias de git por el momento)
Respuestas
No es posible generar un no 'static
futuro a partir de async Rust. Esto se debe a que cualquier función asíncrona puede cancelarse en cualquier momento, por lo que no hay forma de garantizar que la persona que llama realmente sobreviva a las tareas generadas.
Es cierto que hay varias cajas que permiten la generación de tareas asíncronas con alcance, pero estas cajas no se pueden usar desde código asíncrono. Lo que sí permiten es generar tareas asíncronas de ámbito a partir de código no asíncrono . Esto no viola el problema anterior, porque el código no asíncrono que los generó no se puede cancelar en ningún momento, ya que no es asíncrono.
Generalmente hay dos enfoques para esto:
- Genere una
'static
tarea utilizandoArc
referencias en lugar de ordinarias. - Utilice las primitivas de simultaneidad de la caja de futuros en lugar de generar.
Tenga en cuenta que esta respuesta se aplica tanto a Tokio 0.2.x
como a 0.3.x
.
Generalmente, para generar una tarea estática y su uso Arc
, debe tener la propiedad de los valores en cuestión. Esto significa que, dado que su función tomó el argumento por referencia, no puede usar esta técnica sin clonar los datos.
async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
Tenga en cuenta que si tiene una referencia mutable a los datos, y los datos son Sized
, es decir, no un segmento, es posible tomar posesión de ellos temporalmente.
async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}
Otra opción es utilizar las primitivas de simultaneidad de la caja de futuros. Estos tienen la ventaja de trabajar con no 'static
datos, pero la desventaja de que las tareas no podrán ejecutarse en varios subprocesos al mismo tiempo.
Para muchos flujos de trabajo, esto está perfectamente bien, ya que el código asincrónico debería pasar la mayor parte del tiempo esperando IO de todos modos.
Un enfoque es utilizar FuturesUnordered. Esta es una colección especial que puede almacenar muchos futuros diferentes, y tiene una next
función que los ejecuta todos al mismo tiempo, y regresa una vez que finaliza el primero de ellos. (La next
función solo está disponible cuando StreamExt
se importa)
Puedes usarlo así:
use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}
Nota: El FuturesUnordered
debe ser definido después del valor compartido. De lo contrario, obtendrá un error de préstamo debido a que se colocaron en el orden incorrecto.
Otro enfoque es utilizar un Stream
. Con las transmisiones, puede usar buffer_unordered. Esta es una utilidad que usa FuturesUnordered
internamente.
use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}
Tenga en cuenta que en ambos casos, la importación StreamExt
es importante ya que proporciona varios métodos que no están disponibles en las transmisiones sin importar el rasgo de extensión.