TPL डेटाफ़्लो प्रत्येक फ़ाइल को समकालिक रूप से संसाधित करता है लेकिन एक फ़ाइल के भीतर प्रत्येक पंक्ति अतुल्यकालिक रूप से होती है
इसलिए मेरे उपयोग के मामले में मुझे फाइलों की एक सूची संसाधित करने की आवश्यकता होती है, जहां सूची में प्रत्येक फ़ाइल के लिए मैं प्रत्येक पंक्ति से गुजरता हूं और उन रेखाओं पर कुछ गणना करता हूं। अब मेरी समस्या यह है कि मेरे बफर ब्लॉक में कई फाइलों की लाइनें नहीं हो सकती हैं, इसलिए मुझे मूल रूप से यह सुनिश्चित करने की आवश्यकता है कि एक फाइल पूरी तरह से संसाधित हो (डेटाफ्लो ब्लॉक की एक श्रृंखला के माध्यम से), इससे पहले कि मैं दूसरी फाइल भी दर्ज करूं।
अब मैंने एक प्रसंस्करण द्वारा टीपीएल डेटाफ्लो वन को देखा, जहां उत्तर कहता है कि या तो टीपीएल डेटाफ्लो का उपयोग पूरी तरह से बंद कर दें या कई प्रसंस्करण ब्लॉकों को एक में संलग्न करें ताकि मैं इसे नियंत्रित कर सकूं । लेकिन अगर मैं ऐसा करता हूं कि मैं "कंपेटिबिलिटी" खो दूंगा जो कि tpl प्रदान करता है, तो स्वतंत्र ब्लॉकों में एक साथ गांठ करना थोड़ा बेकार लगता है। क्या ऐसा करने का कोई और तरीका है?
जब मुझे दूसरी फाइल में पोस्ट करने से पहले सबकुछ फ्लश हो गया हो, तो मुझे सूचित करने के लिए लीफ नोड पर OutputAvailableAsync का उपयोग करने के बारे में सोचा। लेकिन मैं OutputAvailableAsync को काम करने के लिए नहीं मिला। यह हमेशा के लिए इंतजार करता है।
संपादित करें
पाइपलाइन के नीचे, मेरे पास राज्य के साथ एक एक्शनब्लॉक होगा, जिसके लिए मैं एक समवर्ती छायाकार का उपयोग करने की योजना बना रहा हूं (एक फ़ाइल में प्रत्येक पंक्ति के लिए मेरे पास नोट की कई चीजें हैं)। अब मैं संभवतः प्रत्येक पंक्ति को अनुक्रमणित नहीं कर सकता क्योंकि इसका मतलब होगा कि मुझे N को एक साथ संसाधित होने वाली फ़ाइलों की संख्या के लिए रखना होगा। यहाँ N संभवत: संसाधित की जाने वाली फ़ाइलों की # होगी।
यह वही है जो मेरे पास है, ध्यान में रखते हुए मैंने अभी अवधारणा के प्रमाण को कोडित किया है।
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
जवाब
आप आंशिक रूप से साझा और आंशिक रूप से समर्पित एक पाइपलाइन बनाने के लिए, TPL डेटाफ्लो की सशर्त लिंकिंग क्षमताओं का लाभ उठा सकते हैं । एक एकल रीडर ब्लॉक और एक पार्सर ब्लॉक को सभी फाइलों द्वारा साझा किया जाएगा, जबकि प्रत्येक फ़ाइल के लिए एक समर्पित प्रोसेसर ब्लॉक बनाया जाएगा। यहाँ अवधारणा का एक सरल प्रदर्शन है:
var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
return (line.Id, line.Line?.Split(","));
});
var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
var processor = CreateProcessor(file.Id);
// Create a conditional link from the parser block to the processor block
var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
return File
.ReadLines(file.Path)
.Select(line => (file.Id, line))
.Append((file.Id, null)); // Completion signal
});
ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
var streamWriter = new StreamWriter($@"C:\{fileId}.out");
return new ActionBlock<(int Id, string[] LineParts)>(line =>
{
if (line.LineParts == null)
{
streamWriter.Close(); // Completion signal received
return;
}
streamWriter.WriteLine(String.Join("|", line.LineParts));
});
}
reader.LinkTo(parser);
इस उदाहरण में प्रत्येक फ़ाइल a से संबद्ध है int Id। यह Idप्रत्येक लाइन के साथ पास किया जाता है, ताकि फाइल डाउनस्ट्रीम को फिर से बनाने में सक्षम हो। वैल्यू टुपल्स का उपयोग Idइसकी मूल फ़ाइल के साथ डेटा के प्रत्येक टुकड़े को संयोजित करने के लिए किया जाता है । साझा parserब्लॉक और प्रत्येक समर्पित processorब्लॉक के बीच एक सशर्त लिंक बनाया गया है । एक nullपेलोड का उपयोग फ़ाइल के अंत संकेतक के रूप में किया जाता है। इस संकेत को प्राप्त करने पर एक प्रोसेसर blockको आदर्श रूप से खुद को अनलिंक करना चाहिए parser, ताकि सशर्त लिंकिंग तंत्र के ओवरहेड को न्यूनतम रखा जा सके। विधि linkद्वारा लौटाए गए का निपटान करके अनलिंकिंग किया जाता है LinkTo। सादगी के लिए यह महत्वपूर्ण कदम उपरोक्त उदाहरण से हटा दिया गया है।
मुझे शायद यहाँ दोहराना चाहिए जो मैंने पहले ही अपने पिछले प्रश्न में उत्तर में लिख दिया है , कि ब्लॉक से ब्लॉक करने के लिए अलग-अलग तार गुजरने से महत्वपूर्ण हेडहेड हो जाएगा। पंकिंग (बैचिंग) कार्यभार जाने का एक तरीका है, ताकि यह सुनिश्चित किया जा सके कि पाइपलाइन यथासंभव सुचारू रूप से (घर्षण-मुक्त) प्रदर्शन करेगी।