Jak mogę odrodzić metody asynchroniczne w pętli?
Mam wektor obiektów, które mają resolve()
metodę używaną reqwest
do wysyłania zapytań do zewnętrznego internetowego interfejsu API. Po wywołaniu resolve()
metody na każdym obiekcie chcę wydrukować wynik każdego żądania.
Oto mój półasynchroniczny kod, który kompiluje się i działa (ale nie tak naprawdę asynchronicznie):
for mut item in items {
item.resolve().await;
item.print_result();
}
Próbowałem użyć tokio::join!
do odrodzenia wszystkich wywołań asynchronicznych i czekać na ich zakończenie, ale prawdopodobnie robię coś nie tak:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Oto błąd, który otrzymuję:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Jak mogę wywołać resolve()
metody dla wszystkich instancji jednocześnie?
Ten kod odzwierciedla odpowiedź - teraz mam do czynienia z błędami sprawdzania pożyczek, których tak naprawdę nie rozumiem - czy powinienem dodać adnotacje do niektórych moich zmiennych 'static
?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
Odpowiedzi
Ponieważ chcesz równolegle czekać na przyszłość, możesz odrodzić je w pojedyncze zadania, które działają równolegle. Ponieważ działają one niezależnie od siebie i od wątku, który je zrodził, możesz czekać na ich uchwyty w dowolnej kolejności.
Idealnie byłoby napisać coś takiego:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
Ale to zostanie odrzucone przez item.resolve()
osobę sprawdzającą pożyczki, ponieważ przyszłość zwrócona przez zawiera pożyczone odniesienie do item
. Odwołanie jest przekazywane do tokio::spawn()
innego wątku, a kompilator nie może udowodnić, że item
przeżyje ten wątek. (Ten sam rodzaj problemu występuje, gdy chcesz wysłać odniesienie do danych lokalnych do wątku ).
Istnieje kilka możliwych rozwiązań tego problemu; najbardziej eleganckie jest przenoszenie elementów do przekazanego zamknięcia asynchronicznego tokio::spawn()
, a po zakończeniu zadanie oddanie ich z powrotem. Zasadniczo zużywasz items
wektor do tworzenia zadań i natychmiast odtwarzasz go z oczekiwanych wyników:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
Kod do uruchomienia na placu zabaw .
Zauważ, że futures
skrzynka zawiera join_allfunkcję, która jest podobna do tej, której potrzebujesz, z wyjątkiem tego, że sonduje poszczególne futures bez upewnienia się, że działają one równolegle. Możemy napisać generyczny, join_parallel
który używa join_all
, ale także używa, tokio::spawn
aby uzyskać równoległe wykonanie:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
Używając tej funkcji, kod potrzebny do odpowiedzi na pytanie sprowadza się do:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
Ponownie, kod do uruchomienia na placu zabaw .