ฉันจะวางไข่วิธีอะซิงโครนัสในลูปได้อย่างไร?

Aug 16 2020

ฉันมีเวกเตอร์ของออบเจ็กต์ที่มีresolve()วิธีที่ใช้reqwestในการสืบค้น API เว็บภายนอก หลังจากที่ฉันเรียกใช้resolve()เมธอดในแต่ละออบเจ็กต์ฉันต้องการพิมพ์ผลลัพธ์ของทุกคำขอ

นี่คือโค้ดครึ่งอะซิงโครนัสของฉันที่รวบรวมและใช้งานได้ (แต่ไม่ใช่แบบอะซิงโครนัสจริงๆ):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

ฉันพยายามใช้tokio::join!เพื่อวางสาย async ทั้งหมดและรอให้มันเสร็จสิ้น แต่ฉันอาจทำอะไรผิดพลาด:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

นี่คือข้อผิดพลาดที่ฉันได้รับ:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

ฉันจะเรียกresolve()เมธอดสำหรับอินสแตนซ์ทั้งหมดพร้อมกันได้อย่างไร


รหัสนี้สะท้อนคำตอบ - ตอนนี้ฉันกำลังจัดการกับข้อผิดพลาดของตัวตรวจสอบการยืมที่ฉันไม่เข้าใจจริงๆ - ฉันควรใส่คำอธิบายประกอบตัวแปรของฉันด้วย'staticหรือไม่?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here

คำตอบ

2 user4815162342 Aug 16 2020 at 20:29

เนื่องจากคุณต้องการรออนาคตควบคู่กันไปคุณจึงสามารถวางไข่เป็นงานแต่ละงานที่ทำงานควบคู่กันได้ เนื่องจากพวกมันทำงานเป็นอิสระจากกันและจากเธรดที่สร้างพวกมันคุณสามารถรอการจัดการของพวกมันในลำดับใดก็ได้

ตามหลักการแล้วคุณจะเขียนสิ่งนี้:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

แต่นี้จะถูกปฏิเสธโดยตรวจสอบการยืมเพราะอนาคตที่ส่งกลับโดยถือหุ้นที่ยืมมาอ้างอิงถึงitem.resolve() itemการอ้างอิงถูกส่งผ่านไปtokio::spawn()ยังเธรดอื่นและคอมไพเลอร์ไม่สามารถพิสูจน์ได้ว่าitemจะอยู่ได้นานกว่าเธรดนั้น (ปัญหาเดียวกันนี้จะพบเมื่อคุณต้องการส่งการอ้างอิงไปยังข้อมูลในเครื่องไปยังเธรด )

มีวิธีแก้ปัญหาที่เป็นไปได้หลายประการ สิ่งที่ฉันคิดว่าดีที่สุดคือการย้ายรายการไปยังการปิด async ที่ส่งผ่านไปtokio::spawn()และให้งานส่งกลับมาให้คุณเมื่อเสร็จสิ้น โดยทั่วไปคุณใช้itemsเวกเตอร์เพื่อสร้างงานและสร้างใหม่ทันทีจากผลลัพธ์ที่รอคอย:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

รหัส Runnable ในสนามเด็กเล่น

โปรดทราบว่าfuturesลังมีjoin_allฟังก์ชันที่คล้ายกับสิ่งที่คุณต้องการยกเว้นว่าจะสำรวจฟิวเจอร์สแต่ละรายการโดยไม่ต้องแน่ใจว่าลังทำงานแบบขนาน เราสามารถเขียนทั่วไปjoin_parallelที่ใช้join_allแต่ยังใช้tokio::spawnเพื่อรับการดำเนินการแบบขนาน:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

การใช้ฟังก์ชันนี้รหัสที่จำเป็นในการตอบคำถามจะลดลงเหลือเพียง:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

อีกครั้งรหัสที่ทำงานได้ในสนามเด็กเล่น