Tokio 0.2로 비 정적 미래 생성

Dec 13 2020

일부 Future를 병렬로 실행하고 모든 Future가 완료된 후에 만 ​​반환해야하는 비동기 메서드가 있습니다. 그러나 오래 지속되지 않는 일부 데이터는 참조로 전달됩니다 'static(메인 메서드의 특정 지점에서 삭제됨). 개념적으로 다음과 유사합니다 ( Playground ) :

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

이제 tokio는 전달 된 미래가 평생 spawn유효 하기 를 원합니다 'static. 미래를 멈추지 않고 핸들을 삭제할 수 있기 때문입니다. 즉, 위의 예에서 다음 오류 메시지가 생성됩니다.

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

그래서 내 질문은 : 현재 컨텍스트에 대해서만 유효한 퓨처를 생성하고 모든 것이 완료 될 때까지 기다릴 수있는 방법은 무엇입니까?

(이것이 tokio 0.3에서 가능하지만 0.2가 아니라면 당분간 많은 git 의존성을 포함하더라도 여전히 관심이 있습니다)

답변

3 AliceRyhl Dec 14 2020 at 17:35

'static비동기 Rust에서 미래 가 아닌 것을 스폰하는 것은 불가능합니다 . 이는 모든 비동기 함수가 언제든지 취소 될 수 있기 때문에 호출자가 실제로 생성 된 작업보다 오래 지속된다는 것을 보장 할 방법이 없기 때문입니다.

비동기 작업의 범위 지정 스폰을 허용하는 다양한 크레이트가있는 것은 사실이지만 이러한 크레이트는 비동기 코드에서 사용할 수 없습니다. 그들이 수있는 것은에서 작업을 비동기 범위 산란하는 비 비동기 코드입니다. 이는 위의 문제를 위반하지 않습니다. 왜냐하면이를 생성 한 비동기 코드는 비동기가 아니기 때문에 언제든지 취소 할 수 없기 때문입니다.

일반적으로 이에 대한 두 가지 접근 방식이 있습니다.

  1. 일반 참조가 아닌 'static사용하여 작업을 생성 Arc하십시오.
  2. 생성하는 대신 선물 상자의 동시성 프리미티브를 사용하십시오.

이 답변은 Tokio 0.2.x0.3.x.


일반적으로 정적 작업을 생성하고를 사용하려면 Arc해당 값에 대한 소유권이 있어야합니다. 즉, 함수가 참조로 인수를 취했기 때문에 데이터를 복제하지 않고는이 기술을 사용할 수 없습니다.

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

데이터에 대한 변경 가능한 참조가 있고 데이터 Sized가 슬라이스가 아닌 경우 일시적으로 소유권을 가질 수 있습니다.

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

또 다른 옵션은 선물 상자의 동시성 프리미티브를 사용하는 것입니다. 이것들은 비 'static데이터 로 작업하는 장점이 있지만 작업이 동시에 여러 스레드에서 실행될 수 없다는 단점이 있습니다.

많은 워크 플로의 경우 비동기 코드는 어쨌든 IO를 기다리는 데 대부분의 시간을 소비해야하므로 완벽하게 괜찮습니다.

한 가지 접근 방식은 FuturesUnordered. 이 컬렉션은 다양한 퓨처를 저장할 수있는 특별한 컬렉션으로, next모든 퓨처를 동시에 실행하고 첫 번째가 완료되면 반환 하는 함수가 있습니다. (이 next기능 StreamExt은를 가져올 때만 사용할 수 있습니다. )

다음과 같이 사용할 수 있습니다.

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

참고 :FuturesUnordered 정의해야합니다 공유 값. 그렇지 않으면 잘못된 순서로 삭제되어 발생하는 차용 오류가 발생합니다.


또 다른 접근 방식은 Stream. 스트림을 사용하면 buffer_unordered. FuturesUnordered내부적으로 사용하는 유틸리티입니다 .

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

두 경우 모두 StreamExt확장 트레이 트를 가져 오지 않고 스트림에서 사용할 수없는 다양한 메서드를 제공하므로 가져 오기 가 중요합니다.