루프에서 비동기 메서드를 어떻게 생성 할 수 있습니까?

Aug 16 2020

외부 웹 API를 쿼리하는 resolve()데 사용 하는 메서드 가있는 개체 벡터가 있습니다 reqwest. resolve()각 개체에 대해 메서드를 호출 한 후 모든 요청의 결과를 인쇄하고 싶습니다.

다음은 컴파일되고 작동하는 반 비동기 코드입니다 (실제로 비동기 적으로는 아님).

for mut item in items {
    item.resolve().await;

    item.print_result();
}

tokio::join!모든 비동기 호출을 생성하고 완료 될 때까지 기다리 려고했지만 아마도 뭔가 잘못하고있을 것입니다.

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

내가받는 오류는 다음과 같습니다.

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

resolve()모든 인스턴스에 대한 메서드를 한 번에 호출하려면 어떻게 해야합니까?


이 코드는 대답을 반영합니다. 이제 실제로 이해하지 못하는 차입 검사기 오류를 처리하고 있습니다. 일부 변수에 주석을 추가해야 'static합니까?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

답변

2 user4815162342 Aug 16 2020 at 20:29

병렬로 미래를 기다리고 싶기 때문에 병렬 로 실행되는 개별 작업으로 생성 할 수 있습니다 . 이들은 서로 독립적으로 실행되고이를 생성 한 스레드에 관계없이 실행되므로 순서에 관계없이 핸들을 기다릴 수 있습니다.

이상적으로는 다음과 같이 작성하는 것이 좋습니다.

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

그러나에서 반환 된 미래 item.resolve()가에 대한 차용 참조를 보유 하기 때문에 차용 검사기에서 거부됩니다 item. 참조는 tokio::spawn()다른 스레드 로 전달되는 참조가 전달되며 컴파일러는 item해당 스레드보다 오래 지속된다는 것을 증명할 수 없습니다 . ( 로컬 데이터에 대한 참조를 스레드 로 보내려고 할 때 동일한 종류의 문제가 발생 합니다 .)

이에 대한 몇 가지 가능한 해결책이 있습니다. 내가 가장 우아 하다고 생각하는 것은 항목을에 전달 된 비동기 클로저로 옮기고tokio::spawn() 작업이 완료되면 다시 돌려주는 것입니다. 기본적으로 items벡터를 사용하여 작업을 생성하고 대기 한 결과에서 즉시 재구성합니다.

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

놀이터 에서 실행 가능한 코드 .

참고는 것을 futures상자가 포함되어 join_all당신이 필요 비슷합니다 기능을, 그들은 병렬로 실행을 보장하지 않고 여론 조사를 각각의 미래를 제외시켰다. join_parallel를 사용 하는 제네릭 을 작성할 수 join_all있지만 tokio::spawn병렬 실행을 얻는데 도 사용 합니다.

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

이 함수를 사용하면 질문에 답하는 데 필요한 코드는 다음과 같이 요약됩니다.

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

다시 말하지만 플레이 그라운드 에서 실행 가능한 코드입니다 .