TPL डेटाफ़्लो प्रत्येक फ़ाइल को समकालिक रूप से संसाधित करता है लेकिन एक फ़ाइल के भीतर प्रत्येक पंक्ति अतुल्यकालिक रूप से होती है

Nov 26 2020

इसलिए मेरे उपयोग के मामले में मुझे फाइलों की एक सूची संसाधित करने की आवश्यकता होती है, जहां सूची में प्रत्येक फ़ाइल के लिए मैं प्रत्येक पंक्ति से गुजरता हूं और उन रेखाओं पर कुछ गणना करता हूं। अब मेरी समस्या यह है कि मेरे बफर ब्लॉक में कई फाइलों की लाइनें नहीं हो सकती हैं, इसलिए मुझे मूल रूप से यह सुनिश्चित करने की आवश्यकता है कि एक फाइल पूरी तरह से संसाधित हो (डेटाफ्लो ब्लॉक की एक श्रृंखला के माध्यम से), इससे पहले कि मैं दूसरी फाइल भी दर्ज करूं।

अब मैंने एक प्रसंस्करण द्वारा टीपीएल डेटाफ्लो वन को देखा, जहां उत्तर कहता है कि या तो टीपीएल डेटाफ्लो का उपयोग पूरी तरह से बंद कर दें या कई प्रसंस्करण ब्लॉकों को एक में संलग्न करें ताकि मैं इसे नियंत्रित कर सकूं । लेकिन अगर मैं ऐसा करता हूं कि मैं "कंपेटिबिलिटी" खो दूंगा जो कि tpl प्रदान करता है, तो स्वतंत्र ब्लॉकों में एक साथ गांठ करना थोड़ा बेकार लगता है। क्या ऐसा करने का कोई और तरीका है?

जब मुझे दूसरी फाइल में पोस्ट करने से पहले सबकुछ फ्लश हो गया हो, तो मुझे सूचित करने के लिए लीफ नोड पर OutputAvailableAsync का उपयोग करने के बारे में सोचा। लेकिन मैं OutputAvailableAsync को काम करने के लिए नहीं मिला। यह हमेशा के लिए इंतजार करता है।

संपादित करें

पाइपलाइन के नीचे, मेरे पास राज्य के साथ एक एक्शनब्लॉक होगा, जिसके लिए मैं एक समवर्ती छायाकार का उपयोग करने की योजना बना रहा हूं (एक फ़ाइल में प्रत्येक पंक्ति के लिए मेरे पास नोट की कई चीजें हैं)। अब मैं संभवतः प्रत्येक पंक्ति को अनुक्रमणित नहीं कर सकता क्योंकि इसका मतलब होगा कि मुझे N को एक साथ संसाधित होने वाली फ़ाइलों की संख्या के लिए रखना होगा। यहाँ N संभवत: संसाधित की जाने वाली फ़ाइलों की # होगी।

यह वही है जो मेरे पास है, ध्यान में रखते हुए मैंने अभी अवधारणा के प्रमाण को कोडित किया है।

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

जवाब

1 TheodorZoulias Nov 28 2020 at 02:13

आप आंशिक रूप से साझा और आंशिक रूप से समर्पित एक पाइपलाइन बनाने के लिए, TPL डेटाफ्लो की सशर्त लिंकिंग क्षमताओं का लाभ उठा सकते हैं । एक एकल रीडर ब्लॉक और एक पार्सर ब्लॉक को सभी फाइलों द्वारा साझा किया जाएगा, जबकि प्रत्येक फ़ाइल के लिए एक समर्पित प्रोसेसर ब्लॉक बनाया जाएगा। यहाँ अवधारणा का एक सरल प्रदर्शन है:

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
    return (line.Id, line.Line?.Split(","));
});

var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
    var processor = CreateProcessor(file.Id);

    // Create a conditional link from the parser block to the processor block
    var link = parser.LinkTo(processor, entry => entry.Id == file.Id);

    return File
        .ReadLines(file.Path)
        .Select(line => (file.Id, line))
        .Append((file.Id, null)); // Completion signal
});

ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
    var streamWriter = new StreamWriter($@"C:\{fileId}.out");

    return new ActionBlock<(int Id, string[] LineParts)>(line =>
    {
        if (line.LineParts == null)
        {
            streamWriter.Close(); // Completion signal received
            return;
        }
        streamWriter.WriteLine(String.Join("|", line.LineParts));
    });
}

reader.LinkTo(parser);

इस उदाहरण में प्रत्येक फ़ाइल a से संबद्ध है int Id। यह Idप्रत्येक लाइन के साथ पास किया जाता है, ताकि फाइल डाउनस्ट्रीम को फिर से बनाने में सक्षम हो। वैल्यू टुपल्स का उपयोग Idइसकी मूल फ़ाइल के साथ डेटा के प्रत्येक टुकड़े को संयोजित करने के लिए किया जाता है । साझा parserब्लॉक और प्रत्येक समर्पित processorब्लॉक के बीच एक सशर्त लिंक बनाया गया है । एक nullपेलोड का उपयोग फ़ाइल के अंत संकेतक के रूप में किया जाता है। इस संकेत को प्राप्त करने पर एक प्रोसेसर blockको आदर्श रूप से खुद को अनलिंक करना चाहिए parser, ताकि सशर्त लिंकिंग तंत्र के ओवरहेड को न्यूनतम रखा जा सके। विधि linkद्वारा लौटाए गए का निपटान करके अनलिंकिंग किया जाता है LinkTo। सादगी के लिए यह महत्वपूर्ण कदम उपरोक्त उदाहरण से हटा दिया गया है।

मुझे शायद यहाँ दोहराना चाहिए जो मैंने पहले ही अपने पिछले प्रश्न में उत्तर में लिख दिया है , कि ब्लॉक से ब्लॉक करने के लिए अलग-अलग तार गुजरने से महत्वपूर्ण हेडहेड हो जाएगा। पंकिंग (बैचिंग) कार्यभार जाने का एक तरीका है, ताकि यह सुनिश्चित किया जा सके कि पाइपलाइन यथासंभव सुचारू रूप से (घर्षण-मुक्त) प्रदर्शन करेगी।