Jak mogę odrodzić metody asynchroniczne w pętli?

Aug 16 2020

Mam wektor obiektów, które mają resolve()metodę używaną reqwestdo wysyłania zapytań do zewnętrznego internetowego interfejsu API. Po wywołaniu resolve()metody na każdym obiekcie chcę wydrukować wynik każdego żądania.

Oto mój półasynchroniczny kod, który kompiluje się i działa (ale nie tak naprawdę asynchronicznie):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

Próbowałem użyć tokio::join!do odrodzenia wszystkich wywołań asynchronicznych i czekać na ich zakończenie, ale prawdopodobnie robię coś nie tak:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

Oto błąd, który otrzymuję:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

Jak mogę wywołać resolve()metody dla wszystkich instancji jednocześnie?


Ten kod odzwierciedla odpowiedź - teraz mam do czynienia z błędami sprawdzania pożyczek, których tak naprawdę nie rozumiem - czy powinienem dodać adnotacje do niektórych moich zmiennych 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

Odpowiedzi

2 user4815162342 Aug 16 2020 at 20:29

Ponieważ chcesz równolegle czekać na przyszłość, możesz odrodzić je w pojedyncze zadania, które działają równolegle. Ponieważ działają one niezależnie od siebie i od wątku, który je zrodził, możesz czekać na ich uchwyty w dowolnej kolejności.

Idealnie byłoby napisać coś takiego:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

Ale to zostanie odrzucone przez item.resolve()osobę sprawdzającą pożyczki, ponieważ przyszłość zwrócona przez zawiera pożyczone odniesienie do item. Odwołanie jest przekazywane do tokio::spawn()innego wątku, a kompilator nie może udowodnić, że itemprzeżyje ten wątek. (Ten sam rodzaj problemu występuje, gdy chcesz wysłać odniesienie do danych lokalnych do wątku ).

Istnieje kilka możliwych rozwiązań tego problemu; najbardziej eleganckie jest przenoszenie elementów do przekazanego zamknięcia asynchronicznego tokio::spawn(), a po zakończeniu zadanie oddanie ich z powrotem. Zasadniczo zużywasz itemswektor do tworzenia zadań i natychmiast odtwarzasz go z oczekiwanych wyników:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

Kod do uruchomienia na placu zabaw .

Zauważ, że futuresskrzynka zawiera join_allfunkcję, która jest podobna do tej, której potrzebujesz, z wyjątkiem tego, że sonduje poszczególne futures bez upewnienia się, że działają one równolegle. Możemy napisać generyczny, join_parallelktóry używa join_all, ale także używa, tokio::spawnaby uzyskać równoległe wykonanie:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

Używając tej funkcji, kod potrzebny do odpowiedzi na pytanie sprowadza się do:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

Ponownie, kod do uruchomienia na placu zabaw .