Как я могу создавать асинхронные методы в цикле?
У меня есть вектор объектов, у которых есть resolve()
метод, который использует reqwest
для запроса внешнего веб-API. После вызова resolve()
метода для каждого объекта я хочу распечатать результат каждого запроса.
Вот мой полусинхронный код, который компилируется и работает (но не совсем асинхронно):
for mut item in items {
item.resolve().await;
item.print_result();
}
Я пытался использовать tokio::join!
для создания всех асинхронных вызовов и ждать их завершения, но я, вероятно, делаю что-то не так:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Вот ошибка, которую я получаю:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Как я могу вызвать resolve()
методы сразу для всех экземпляров?
Этот код отражает ответ - теперь я имею дело с ошибками средства проверки заимствований, которых я действительно не понимаю - следует ли мне аннотировать некоторые из моих переменных 'static
?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
Ответы
Поскольку вы хотите, чтобы фьючерсы ожидали параллельно, вы можете порождать их в отдельные задачи, которые выполняются параллельно. Поскольку они работают независимо друг от друга и от потока, который их породил, вы можете ожидать их дескрипторы в любом порядке.
В идеале вы должны написать что-то вроде этого:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
Но это будет отклонено программой проверки заимствований, потому что будущее, возвращаемое item.resolve()
функцией, содержит заимствованную ссылку на item
. Ссылка передается в tokio::spawn()
другой поток, и компилятор не может доказать, что item
он переживет этот поток. (Такая же проблема возникает, когда вы хотите отправить ссылку на локальные данные в поток .)
Для этого есть несколько возможных решений; один, который я считаю наиболее элегантным, - это переместить элементы в переданное асинхронное закрытие tokio::spawn()
и заставить задачу вернуть их вам, как только это будет сделано. В основном вы используете items
вектор для создания задач и немедленно восстанавливаете его из ожидаемых результатов:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
Запускаемый код на детской площадке .
Обратите внимание, что futures
ящик содержит join_allфункцию, которая похожа на то, что вам нужно, за исключением того, что она опрашивает отдельные фьючерсы, не гарантируя, что они работают параллельно. Мы можем написать общий тип, join_parallel
который использует join_all
, но также использует tokio::spawn
для параллельного выполнения:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
Используя эту функцию, код, необходимый для ответа на вопрос, сводится к следующему:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
Опять же, исполняемый код на детской площадке .