मैं एक लूप में अतुल्यकालिक तरीकों को कैसे अपना सकता हूं?
मेरे पास वस्तुओं का एक वेक्टर है जिसमें एक resolve()
विधि है reqwest
जो बाहरी वेब एपीआई को क्वेरी करने के लिए उपयोग करती है। resolve()
प्रत्येक ऑब्जेक्ट पर विधि को कॉल करने के बाद , मैं हर अनुरोध के परिणाम को प्रिंट करना चाहता हूं।
यहां मेरा आधा-अतुल्यकालिक कोड है जो संकलित करता है और काम करता है (लेकिन वास्तव में अतुल्यकालिक नहीं):
for mut item in items {
item.resolve().await;
item.print_result();
}
मैंने tokio::join!
सभी async कॉल को स्पॉन करने के लिए उपयोग करने की कोशिश की है और उनके समाप्त होने की प्रतीक्षा कर रहा हूं, लेकिन मैं शायद कुछ गलत कर रहा हूं:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
यहाँ त्रुटि मुझे मिल रही है:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
मैं resolve()
एक ही बार में सभी उदाहरणों के लिए तरीकों को कैसे कह सकता हूं ?
यह कोड उत्तर को दर्शाता है - अब मैं उधार लेने वाले चेकर त्रुटियों से निपट रहा हूं जो मुझे वास्तव में समझ में नहीं आता है - क्या मुझे अपने कुछ चरों की व्याख्या करनी चाहिए 'static
?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
जवाब
चूंकि आप समानांतर में वायदा पर इंतजार करना चाहते हैं, आप उन्हें समानांतर में चलने वाले व्यक्तिगत कार्यों में शामिल कर सकते हैं । चूंकि वे एक-दूसरे के स्वतंत्र रूप से चलते हैं और जिस धागे ने उन्हें पैदा किया है, आप किसी भी क्रम में उनके हैंडल का इंतजार कर सकते हैं।
आदर्श रूप में आप कुछ इस तरह लिखेंगे:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
लेकिन इसे उधारकर्ता द्वारा अस्वीकार कर दिया जाएगा क्योंकि भविष्य द्वारा लौटाया गया item.resolve()
एक उधार संदर्भ है item
। इस संदर्भ को पारित कर दिया जाता है, tokio::spawn()
जो इसे दूसरे धागे को सौंप देता है, और संकलक यह साबित नहीं कर सकता है कि item
उस धागे को बदल देगा। (जब आप किसी थ्रेड पर स्थानीय डेटा का संदर्भ भेजना चाहते हैं तो उसी तरह की समस्या सामने आती है ।)
इसके कई संभावित समाधान हैं; जो मुझे सबसे अधिक सुंदर लगता है वह यह है कि आइटम को पास किए गए async कोठरी में ले जाना है tokio::spawn()
, और काम पूरा होने के बाद उन्हें वापस आपके हाथ में देना है। मूल रूप से आप items
कार्यों को बनाने के लिए वेक्टर का उपभोग करते हैं और तुरंत प्रतीक्षित परिणामों से इसका पुनर्गठन करते हैं:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
खेल के मैदान में चलने योग्य कोड ।
ध्यान दें कि futures
टोकरे में एक join_allफ़ंक्शन होता है जो आपके लिए आवश्यक समान है, सिवाय इसके कि यह सुनिश्चित करने के बिना व्यक्तिगत वायदा को पोल करता है कि वे समानांतर में चलते हैं। हम एक जेनेरिक लिख सकते हैं join_parallel
जो उपयोग करता है join_all
, लेकिन tokio::spawn
समानांतर निष्पादन प्राप्त करने के लिए भी उपयोग करता है:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
इस फ़ंक्शन का उपयोग करके प्रश्न का उत्तर देने के लिए आवश्यक कोड को केवल नीचे दिया गया है:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
खेल के मैदान में फिर से रन करने योग्य कोड ।