अपाचे स्पार्क - त्वरित गाइड
उद्योग अपने डेटा सेट का विश्लेषण करने के लिए Hadoop का बड़े पैमाने पर उपयोग कर रहे हैं। कारण यह है कि Hadoop फ्रेमवर्क एक सरल प्रोग्रामिंग मॉडल (MapReduce) पर आधारित है और यह एक कंप्यूटिंग समाधान को सक्षम करता है जो स्केलेबल, लचीला, दोष-सहिष्णु और लागत प्रभावी है। यहां, प्रश्नों को चलाने और कार्यक्रम को चलाने के लिए प्रतीक्षा समय के बीच बड़े डेटासेट को संसाधित करने में गति बनाए रखना मुख्य चिंता है।
स्पैडो को अपाचे सॉफ्टवेयर फाउंडेशन द्वारा हडोप कम्प्यूट कम्प्यूटिंग सॉफ्टवेयर प्रक्रिया में तेजी लाने के लिए पेश किया गया था।
एक आम धारणा के विपरीत, Spark is not a modified version of Hadoopऔर, वास्तव में, Hadoop पर निर्भर नहीं है क्योंकि इसका अपना क्लस्टर प्रबंधन है। हाडोप स्पार्क को लागू करने के तरीकों में से एक है।
स्पार्क दो तरीकों से Hadoop का उपयोग करता है - एक है storage और दूसरा है processing। चूंकि स्पार्क की अपनी क्लस्टर प्रबंधन संगणना है, इसलिए यह केवल भंडारण उद्देश्य के लिए Hadoop का उपयोग करता है।
अपाचे स्पार्क
अपाचे स्पार्क एक लाइटनिंग-फास्ट क्लस्टर कंप्यूटिंग तकनीक है, जिसे तेज गणना के लिए डिज़ाइन किया गया है। यह Hadoop MapReduce पर आधारित है और यह MapReduce मॉडल को कुशलता से अधिक प्रकार की गणनाओं के लिए उपयोग करने के लिए विस्तारित करता है, जिसमें इंटरैक्टिव क्वेरी और स्ट्रीम प्रोसेसिंग शामिल है। स्पार्क की मुख्य विशेषता इसकी हैin-memory cluster computing जो किसी एप्लिकेशन की प्रोसेसिंग स्पीड को बढ़ाता है।
स्पार्क को कई प्रकार के वर्कलोड जैसे कि बैच एप्लिकेशन, पुनरावृत्त एल्गोरिदम, इंटरएक्टिव क्वेरी और स्ट्रीमिंग को कवर करने के लिए डिज़ाइन किया गया है। संबंधित प्रणाली में इन सभी कार्यभार का समर्थन करने के अलावा, यह अलग-अलग उपकरणों को बनाए रखने के प्रबंधन के बोझ को कम करता है।
अपाचे स्पार्क का विकास
स्पार्क होडोप की उप परियोजना में से एक है जो 2009 में माटी ज़हरिया द्वारा यूसी बर्कले के एएमपीलैब में विकसित की गई थी। यह 2010 में बीएसडी लाइसेंस के तहत ओपन सोर्ड था। यह 2013 में अपाचे सॉफ्टवेयर फाउंडेशन को दान कर दिया गया था, और अब अपाचे स्पार्क फरवरी 2014 से शीर्ष स्तर की अपाचे परियोजना बन गई है।
अपाचे स्पार्क की विशेषताएं
अपाचे स्पार्क में निम्नलिखित विशेषताएं हैं।
Speed- स्पार्क Hadoop क्लस्टर में एक एप्लिकेशन को चलाने में मदद करता है, मेमोरी में 100 गुना तेज और डिस्क पर चलने पर 10 गुना तेज। डिस्क पर रीड / राइट ऑपरेशन की संख्या कम करके यह संभव है। यह इंटरमीडिएट प्रोसेसिंग डाटा को मेमोरी में स्टोर करता है।
Supports multiple languages- स्पार्क जावा, स्काला, या पायथन में निर्मित एपीआई प्रदान करता है। इसलिए, आप विभिन्न भाषाओं में एप्लिकेशन लिख सकते हैं। स्पार्क इंटरएक्टिव क्वेरी के लिए 80 उच्च-स्तरीय ऑपरेटरों के साथ आता है।
Advanced Analytics- स्पार्क न केवल 'मैप' और 'कम' का समर्थन करता है। यह SQL क्वेरी, स्ट्रीमिंग डेटा, मशीन लर्निंग (एमएल) और ग्राफ़ एल्गोरिदम का भी समर्थन करता है।
स्पार्क बिल्ट पर बनाया गया
निम्नलिखित आरेख तीन तरीकों को दर्शाता है कि स्पार्क को Hadoop घटकों के साथ कैसे बनाया जा सकता है।
नीचे बताया गया स्पार्क तैनाती के तीन तरीके हैं।
Standalone- स्पार्क स्टैंडअलोन तैनाती का मतलब है कि स्पार्क एचडीएफएस (हडोप डिस्ट्रीब्यूटेड फाइल सिस्टम) के शीर्ष पर जगह रखता है और स्पष्ट रूप से एचडीएफएस के लिए जगह आवंटित की जाती है। यहाँ, स्पार्क और MapReduce क्लस्टर पर सभी स्पार्क नौकरियों को कवर करने के लिए कंधे से कंधा मिलाकर चलेंगे।
Hadoop Yarn- Hadoop यार्न की तैनाती का मतलब है, बस, स्पार्क यार्न पर बिना किसी पूर्व-इंस्टॉलेशन या रूट एक्सेस के आवश्यक है। यह स्पार्क को Hadoop इकोसिस्टम या Hadoop स्टैक में एकीकृत करने में मदद करता है। यह अन्य घटकों को स्टैक के शीर्ष पर चलने की अनुमति देता है।
Spark in MapReduce (SIMR)- MapReduce में स्पार्क का उपयोग स्टैंडअलोन तैनाती के अलावा स्पार्क जॉब लॉन्च करने के लिए किया जाता है। SIMR के साथ, उपयोगकर्ता स्पार्क शुरू कर सकता है और बिना किसी प्रशासनिक पहुंच के इसके शेल का उपयोग कर सकता है।
स्पार्क के घटक
निम्नलिखित दृष्टांत स्पार्क के विभिन्न घटकों को दर्शाते हैं।
अपाचे स्पार्क कोर
स्पार्क कोर स्पार्क प्लेटफॉर्म के लिए अंतर्निहित सामान्य निष्पादन इंजन है जो अन्य सभी कार्यक्षमता पर बनाया गया है। यह बाहरी मेमोरी सिस्टम में इन-मेमोरी कंप्यूटिंग और रेफरेंसिंग डेटासेट प्रदान करता है।
स्पार्क एसक्यूएल
स्पार्क एसक्यूएल स्पार्क कोर के शीर्ष पर एक घटक है जो स्कीमाआरडीडी नामक एक नया डेटा एब्स्ट्रैक्शन पेश करता है, जो संरचित और अर्ध-संरचित डेटा के लिए समर्थन प्रदान करता है।
स्पार्क स्ट्रीमिंग
स्पार्क स्ट्रीमिंग, स्ट्रीमिंग एनालिटिक्स प्रदर्शन करने के लिए स्पार्क कोर की तीव्र शेड्यूलिंग क्षमता का लाभ उठाती है। यह मिनी-बैचों में डेटा को सम्मिलित करता है और डेटा के उन मिनी-बैचों पर आरडीडी (रेजिलिएंट डिस्ट्रीब्यूटेड डेटासेट्स) रूपांतरण करता है।
MLlib (मशीन लर्निंग लाइब्रेरी)
MLlib वितरित मेमोरी-आधारित स्पार्क वास्तुकला के कारण स्पार्क के ऊपर एक वितरित मशीन लर्निंग फ्रेमवर्क है। यह बेंचमार्क के अनुसार, एमएलबी डेवलपर्स द्वारा अल्टरनेटिंग लिस्ट स्क्वायर (एएलएस) कार्यान्वयन के खिलाफ किया जाता है। स्पार्क MLlib Hadoop डिस्क-आधारित संस्करण के रूप में नौ गुना तेज हैApache Mahout (इससे पहले कि महावत ने स्पार्क इंटरफ़ेस प्राप्त किया)।
GraphX
ग्राफएक्स स्पार्क के शीर्ष पर एक वितरित ग्राफ-प्रोसेसिंग ढांचा है। यह ग्राफ संगणना को व्यक्त करने के लिए एक एपीआई प्रदान करता है जो उपयोगकर्ता-परिभाषित रेखांकन को Pregel abstraction API का उपयोग करके मॉडल कर सकता है। यह इस अमूर्त के लिए एक अनुकूलित रनटाइम भी प्रदान करता है।
लचीला वितरित डेटासेट
रेसिलिएंट डिस्ट्रिब्यूटेड डेटसेट्स (आरडीडी) स्पार्क की एक मूलभूत डेटा संरचना है। यह वस्तुओं का एक अपरिवर्तित वितरित संग्रह है। RDD में प्रत्येक डेटासेट को तार्किक विभाजन में विभाजित किया गया है, जिसे क्लस्टर के विभिन्न नोड्स पर गणना की जा सकती है। RDD में उपयोगकर्ता-परिभाषित कक्षाओं सहित किसी भी प्रकार के पायथन, जावा या स्काला ऑब्जेक्ट शामिल हो सकते हैं।
औपचारिक रूप से, एक RDD एक केवल-पढ़ने के लिए, अभिलेखों का विभाजन संग्रह है। RDDs स्थिर भंडारण या अन्य RDDs के डेटा पर नियतात्मक संचालन के माध्यम से बनाया जा सकता है। RDD तत्वों का एक दोष-सहिष्णु संग्रह है जिसे समानांतर में संचालित किया जा सकता है।
RDDs बनाने के दो तरीके हैं - parallelizing आपके ड्राइवर प्रोग्राम में मौजूदा संग्रह, या referencing a dataset एक बाहरी भंडारण प्रणाली में, जैसे कि एक साझा फ़ाइल सिस्टम, HDFS, HBase, या Hadoop Input Format की पेशकश करने वाला कोई भी डेटा स्रोत।
स्पार्क RDD की अवधारणा का उपयोग तेज और कुशल MapReduce संचालन को प्राप्त करने के लिए करता है। चलिए पहले चर्चा करते हैं कि MapReduce के संचालन कैसे होते हैं और वे इतने कुशल क्यों नहीं हैं।
MapReduce में Data Sharing Slow है
MapReduce व्यापक रूप से एक क्लस्टर पर समानांतर, वितरित एल्गोरिदम के साथ बड़े डेटासेट को संसाधित करने और उत्पन्न करने के लिए अपनाया जाता है। यह उपयोगकर्ताओं को काम वितरण और गलती सहिष्णुता के बारे में चिंता किए बिना, उच्च-स्तरीय ऑपरेटरों के एक सेट का उपयोग करके, समानांतर संगणना लिखने की अनुमति देता है।
दुर्भाग्य से, अधिकांश वर्तमान रूपरेखाओं में, संगणनाओं के बीच डेटा का पुन: उपयोग करने का एकमात्र तरीका (दो MapReduce नौकरियों के बीच) एक बाहरी स्थिर भंडारण प्रणाली (Ex - HDFS) के लिए इसे लिखना है। यद्यपि यह ढांचा क्लस्टर के कम्प्यूटेशनल संसाधनों तक पहुंचने के लिए कई सार प्रदान करता है, फिर भी उपयोगकर्ता अधिक चाहते हैं।
दोनों Iterative तथा Interactiveअनुप्रयोगों को समानांतर नौकरियों में तेजी से डेटा साझा करने की आवश्यकता होती है। के कारण MapReduce में डेटा साझाकरण धीमा हैreplication, serialization, तथा disk IO। स्टोरेज सिस्टम के बारे में, अधिकांश हडॉप एप्लिकेशन, वे HDFS पढ़ने-लिखने के संचालन में 90% से अधिक समय बिताते हैं।
MapReduce पर Iterative ऑपरेशंस
बहु-चरण अनुप्रयोगों में कई संगणनाओं के बीच मध्यवर्ती परिणामों का पुन: उपयोग करें। निम्न चित्रण यह बताता है कि MapReduce पर पुनरावृत्त संचालन करते समय वर्तमान रूपरेखा कैसे काम करती है। यह डेटा प्रतिकृति, डिस्क I / O और क्रमांकन के कारण पर्याप्त ओवरहेड्स को उकसाता है, जिससे सिस्टम धीमा हो जाता है।
MapReduce पर इंटरएक्टिव संचालन
उपयोगकर्ता डेटा के एक ही सबसेट पर तदर्थ क्वेरी चलाता है। प्रत्येक क्वेरी स्थिर स्टोरेज पर डिस्क I / O करेगी, जो एप्लिकेशन निष्पादन समय पर हावी हो सकती है।
निम्नलिखित चित्रण यह बताता है कि MapReduce पर संवादात्मक प्रश्न करते समय वर्तमान रूपरेखा कैसे काम करती है।
स्पार्क आरडीडी का उपयोग करके डेटा शेयरिंग
के कारण MapReduce में डेटा साझाकरण धीमा है replication, serialization, तथा disk IO। Hadoop के अधिकांश एप्लिकेशन, वे HDFS पढ़ने-लिखने के संचालन में 90% से अधिक समय बिताते हैं।
इस समस्या को स्वीकार करते हुए, शोधकर्ताओं ने अपाचे स्पार्क नामक एक विशेष रूपरेखा विकसित की। चिंगारी का मुख्य विचार हैResilient Distributed Dएसेट्स (आरडीडी); यह इन-मेमोरी प्रोसेसिंग कंप्यूटेशन का समर्थन करता है। इसका मतलब है, यह नौकरियों में एक ऑब्जेक्ट के रूप में मेमोरी की स्थिति को संग्रहीत करता है और उन नौकरियों के बीच ऑब्जेक्ट को साझा करना है। नेटवर्क और डिस्क की तुलना में मेमोरी में डेटा शेयरिंग 10 से 100 गुना तेज है।
आइए अब यह पता लगाने की कोशिश करते हैं कि स्पार्क आरडीडी में पुनरावृत्ति और संवादात्मक संचालन कैसे होता है।
स्पार्क आरडीडी पर Iterative ऑपरेशन
नीचे दिया गया चित्र स्पार्क आरडीडी पर चलने वाले संचालन को दर्शाता है। यह स्थिर भंडारण (डिस्क) के बजाय एक वितरित मेमोरी में मध्यवर्ती परिणाम संग्रहीत करेगा और सिस्टम को तेज करेगा।
Note - यदि वितरित मेमोरी (RAM) मध्यवर्ती परिणाम (JOB की स्थिति) को संग्रहीत करने के लिए पर्याप्त है, तो यह उन परिणामों को डिस्क पर संग्रहीत करेगा।
स्पार्क आरडीडी पर इंटरएक्टिव संचालन
यह चित्रण स्पार्क आरडीडी पर इंटरैक्टिव संचालन को दर्शाता है। यदि डेटा के एक ही सेट पर बार-बार अलग-अलग क्वेरीज़ रन की जाती हैं, तो बेहतर निष्पादन समय के लिए इस विशेष डेटा को मेमोरी में रखा जा सकता है।
डिफ़ॉल्ट रूप से, प्रत्येक रूपांतरित RDD को उस पर कार्रवाई चलाने पर हर बार पुनः प्राप्त किया जा सकता है। हालाँकि, आप भी कर सकते हैंpersistमेमोरी में एक RDD, जिस स्थिति में स्पार्क तत्वों को क्लस्टर पर बहुत तेज़ पहुँच के लिए रखेगा, अगली बार जब आप इसे क्वेरी करेंगे। डिस्क पर RDD को बनाए रखने के लिए समर्थन भी है, या कई नोड्स में दोहराया गया है।
स्पार्क हडोप की उप-परियोजना है। इसलिए, स्पार्क को लिनक्स आधारित प्रणाली में स्थापित करना बेहतर है। निम्न चरण दिखाते हैं कि अपाचे स्पार्क को कैसे स्थापित किया जाए।
चरण 1: जावा स्थापना का सत्यापन
स्पार्क को स्थापित करने में जावा इंस्टॉलेशन अनिवार्य चीजों में से एक है। जावा संस्करण को सत्यापित करने के लिए निम्न कमांड का प्रयास करें।
$java -version
यदि जावा पहले से ही आपके सिस्टम में स्थापित है, तो आपको निम्न प्रतिक्रिया देखने को मिलेगी -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
यदि आपके पास जावा आपके सिस्टम पर स्थापित नहीं है, तो अगले चरण पर आगे बढ़ने से पहले जावा स्थापित करें।
चरण 2: स्कैला स्थापना को सत्यापित करना
स्पार्क को लागू करने के लिए आपको स्कैला भाषा चाहिए। तो आइए हम निम्नलिखित आदेश का उपयोग करके स्काला इंस्टॉलेशन को सत्यापित करते हैं।
$scala -version
यदि आपके सिस्टम पर पहले से ही Scala स्थापित है, तो आपको निम्न प्रतिक्रिया देखने को मिलेगी -
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
यदि आपके पास अपने सिस्टम पर Scala स्थापित नहीं है, तो आप Scala स्थापना के लिए अगले चरण पर आगे बढ़ें।
चरण 3: स्कैला डाउनलोड करना
निम्न लिंक पर जाकर स्काला का नवीनतम संस्करण डाउनलोड करें । इस ट्यूटोरियल के लिए, हम scala-2.11.6 संस्करण का उपयोग कर रहे हैं। डाउनलोड करने के बाद, आपको डाउनलोड फ़ोल्डर में स्काला टार फाइल मिलेगी।
चरण 4: स्कैला स्थापित करना
स्केल स्थापित करने के लिए नीचे दिए गए चरणों का पालन करें।
स्काला टार फ़ाइल को निकालें
Scala tar फ़ाइल निकालने के लिए निम्न कमांड टाइप करें।
$ tar xvf scala-2.11.6.tgz
Scala सॉफ्टवेयर फाइल्स को मूव करें
स्काला सॉफ़्टवेयर फ़ाइलों को संबंधित निर्देशिका में ले जाने के लिए निम्न आदेशों का उपयोग करें (/usr/local/scala)।
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit
स्काला के लिए पाथ सेट करें
Scala के लिए PATH की स्थापना के लिए निम्न कमांड का उपयोग करें।
$ export PATH = $PATH:/usr/local/scala/bin
स्काला इंस्टालेशन का सत्यापन
स्थापना के बाद, इसे सत्यापित करना बेहतर है। Scala स्थापना को सत्यापित करने के लिए निम्न कमांड का उपयोग करें।
$scala -version
यदि आपके सिस्टम पर पहले से ही Scala स्थापित है, तो आपको निम्न प्रतिक्रिया देखने को मिलेगी -
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
चरण 5: अपाचे स्पार्क डाउनलोड करना
निम्नलिखित लिंक पर जाकर स्पार्क का नवीनतम संस्करण डाउनलोड करें । इस ट्यूटोरियल के लिए, हम उपयोग कर रहे हैंspark-1.3.1-bin-hadoop2.6संस्करण। इसे डाउनलोड करने के बाद, आपको डाउनलोड फ़ोल्डर में स्पार्क टार फाइल मिलेगी।
चरण 6: स्पार्क स्थापित करना
स्पार्क स्थापित करने के लिए नीचे दिए गए चरणों का पालन करें।
स्पार्क टार निकालना
स्पार्क टार फाइल निकालने के लिए निम्न कमांड।
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
स्पार्क सॉफ़्टवेयर फ़ाइलों को ले जाना
स्पार्क सॉफ़्टवेयर फ़ाइलों को संबंधित निर्देशिका में ले जाने के लिए निम्न आदेश (/usr/local/spark)।
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit
स्पार्क के लिए पर्यावरण की स्थापना
निम्नलिखित पंक्ति को ~ में जोड़ें/.bashrcफ़ाइल। इसका अर्थ है उस स्थान को जोड़ना, जहां स्पार्क सॉफ्टवेयर फ़ाइल पथ चर के लिए स्थित है।
export PATH=$PATH:/usr/local/spark/bin
~ / .Bashrc फ़ाइल की सोर्सिंग के लिए निम्न कमांड का उपयोग करें।
$ source ~/.bashrc
चरण 7: स्पार्क इंस्टॉलेशन को सत्यापित करना
स्पार्क खोल खोलने के लिए निम्नलिखित कमांड लिखिए।
$spark-shell
यदि स्पार्क सफलतापूर्वक स्थापित किया गया है, तो आपको निम्न आउटपुट मिलेगा।
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
स्पार्क कोर पूरी परियोजना का आधार है। यह वितरित प्रेषण, शेड्यूलिंग और बुनियादी I / O फ़ंक्शंस प्रदान करता है। स्पार्क RDD के रूप में जाना जाने वाला एक विशेष मौलिक डेटा संरचना का उपयोग करता है (लचीला वितरित डेटासेट) जो मशीनों के पार डेटा विभाजन का एक तार्किक संग्रह है। RDD को दो तरह से बनाया जा सकता है; एक बाह्य भंडारण प्रणालियों में डेटासेट संदर्भित करके है और दूसरा मौजूदा RDD पर रूपांतरण (जैसे मानचित्र, फ़िल्टर, रिड्यूसर, ज्वाइन) लगाने से है।
RDD अमूर्त एक भाषा-एकीकृत एपीआई के माध्यम से उजागर होता है। यह प्रोग्रामिंग जटिलता को आसान बनाता है क्योंकि जिस तरह से RDDs में एप्लिकेशन हेरफेर करते हैं वह डेटा के स्थानीय संग्रह में हेरफेर करने के समान है।
स्पार्क शेल
स्पार्क एक इंटरैक्टिव शेल प्रदान करता है - डेटा का इंटरेक्टिव रूप से विश्लेषण करने के लिए एक शक्तिशाली उपकरण। यह स्काला या पाइथन भाषा में उपलब्ध है। स्पार्क का प्राथमिक अमूर्त वस्तुओं का वितरित संग्रह है, जिसे रेजिस्टेंट डिस्ट्रीब्यूटेड डेटसेट (आरडीडी) कहा जाता है। RDDs को Hadoop Input Formats (जैसे HDFS फाइलें) से या अन्य RDDs को रूपांतरित करके बनाया जा सकता है।
स्पार्क खोल खोलें
स्पार्क शेल को खोलने के लिए निम्न कमांड का उपयोग किया जाता है।
$ spark-shell
सरल RDD बनाएँ
पाठ फ़ाइल से एक सरल RDD बनाते हैं। एक साधारण RDD बनाने के लिए निम्न कमांड का उपयोग करें।
scala> val inputfile = sc.textFile(“input.txt”)
उपरोक्त कमांड के लिए आउटपुट है
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
स्पार्क आरडीडी एपीआई कुछ का परिचय देता है Transformations और कुछ Actions RDD में हेरफेर करने के लिए।
RDD ट्रांसफ़ॉर्मेशन
RDD परिवर्तन नए RDD के लिए सूचक लौटाता है और आपको RDD के बीच निर्भरता बनाने की अनुमति देता है। निर्भरता श्रृंखला में प्रत्येक RDD (स्ट्रिंग ऑफ डिपेंडेंसी) के पास अपने डेटा की गणना करने के लिए एक फ़ंक्शन होता है और इसके मूल RDD के लिए एक संकेतक (निर्भरता) होता है।
स्पार्क आलसी है, इसलिए कुछ भी निष्पादित नहीं किया जाएगा जब तक कि आप कुछ परिवर्तन या कार्रवाई को नहीं बुलाते हैं जो रोजगार सृजन और निष्पादन को गति प्रदान करेगा। शब्द-गणना उदाहरण के निम्नलिखित स्निपेट को देखें।
इसलिए, RDD परिवर्तन डेटा का एक सेट नहीं है, लेकिन एक कार्यक्रम में एक कदम (शायद एकमात्र कदम हो सकता है) स्पार्क को बता रहा है कि डेटा कैसे प्राप्त करें और इसके साथ क्या करना है।
S.No | रूपांतरण और अर्थ |
---|---|
1 | map(func) फ़ंक्शन के माध्यम से स्रोत के प्रत्येक तत्व को पारित करके एक नया वितरित डेटासेट देता है func। |
2 | filter(func) स्रोत के उन तत्वों का चयन करके गठित एक नया डेटासेट देता है, जिस पर func सच लौटाता है। |
3 | flatMap(func) मानचित्र के समान, लेकिन प्रत्येक इनपुट आइटम को 0 या अधिक आउटपुट आइटम पर मैप किया जा सकता है (इसलिए फंक को एकल आइटम के बजाय Seq लौटना चाहिए)। |
4 | mapPartitions(func) मानचित्र के समान, लेकिन आरडीडी के प्रत्येक विभाजन (ब्लॉक) पर अलग से चलता है, इसलिए func टाइप I का होना चाहिए Iterator <T> ⇒ Iterator <U> जब टाइप T के RDD पर चल रहा हो। |
5 | mapPartitionsWithIndex(func) मानचित्र विभाजन के समान, लेकिन यह भी प्रदान करता है func एक पूर्णांक मान के साथ विभाजन के सूचकांक का प्रतिनिधित्व करता है, इसलिए func प्रकार का होना चाहिए (Int, Iterator <T>) ⇒ Iterator <U> जब टाइप T के RDD पर चल रहा हो। |
6 | sample(withReplacement, fraction, seed) नमूना a fraction डेटा के साथ या प्रतिस्थापन के बिना, एक यादृच्छिक संख्या जनरेटर बीज का उपयोग कर। |
7 | union(otherDataset) एक नया डेटासेट देता है जिसमें स्रोत डेटासेट और तर्क में तत्वों का मिलन होता है। |
8 | intersection(otherDataset) एक नया RDD लौटाता है जिसमें स्रोत डेटासेट और तर्क में तत्वों का प्रतिच्छेदन होता है। |
9 | distinct([numTasks]) एक नया डेटासेट देता है जिसमें स्रोत डेटासेट के अलग-अलग तत्व होते हैं। |
10 | groupByKey([numTasks]) जब (K, V) जोड़े के डेटासेट पर कॉल किया जाता है, तो (K, Iterable <V>) जोड़े के डेटासेट लौटाता है। Note - यदि आप प्रत्येक कुंजी के ऊपर एकत्रीकरण (जैसे योग या औसत) करने के लिए समूहीकरण कर रहे हैं, तो कमबाइके या समुच्चयबैंक का उपयोग करके बेहतर प्रदर्शन प्राप्त करेंगे। |
1 1 | reduceByKey(func, [numTasks]) जब (K, V) जोड़े के डेटासेट पर कॉल किया जाता है, तो (K, V) जोड़े के एक डेटासेट देता है, जहां दिए गए कम फंक्शन फंक का उपयोग करके प्रत्येक कुंजी के मानों को एकत्र किया जाता है , जो कि टाइप (V, V) ⇒ V का होना चाहिए । जैसे groupByKey में, कार्य को कम करने की संख्या एक वैकल्पिक दूसरे तर्क के माध्यम से कॉन्फ़िगर करने योग्य है। |
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) जब (K, V) जोड़े के डेटासेट पर कॉल किया जाता है, तो (K, U) जोड़े के एक डेटासेट देता है, जहां दिए गए कंबाइन फ़ंक्शन और एक तटस्थ "शून्य" मान का उपयोग करके प्रत्येक कुंजी के मानों को एकत्र किया जाता है। एक एकीकृत मूल्य प्रकार को अनुमति देता है जो अनावश्यक आवंटन से बचने के दौरान इनपुट मूल्य प्रकार से भिन्न होता है। GroupByKey की तरह, वैकल्पिक दूसरे तर्क के माध्यम से कम कार्यों की संख्या कॉन्फ़िगर करने योग्य है। |
13 | sortByKey([ascending], [numTasks]) जब (K, V) युग्मों के डेटासेट पर कॉल किया जाता है, जहां K क्रमबद्ध होते हैं, तो K (K, V) जोड़े के डेटासेट्स को आरोही या अवरोही क्रम में कुंजियों द्वारा क्रमबद्ध किया जाता है, जैसा कि बूलियन आरोही तर्क में निर्दिष्ट है। |
14 | join(otherDataset, [numTasks]) जब (K, V) और (K, W) प्रकार के डेटासेट पर कॉल किया जाता है, तो प्रत्येक कुंजी के लिए तत्वों के सभी जोड़े के साथ (K, (V, W)) जोड़े का एक डेटासेट लौटाता है। बाहरी जॉइंट को leftOuterJoin, rightOuterJoin, और fullOuterJoin के माध्यम से सपोर्ट किया जाता है। |
15 | cogroup(otherDataset, [numTasks]) जब (K, V) और (K, W) प्रकार के डेटासेट पर कॉल किया जाता है, तो (K, (Iterable <V>, Iterable <W>)) ट्यूपल्स का एक डेटासेट लौटाता है। इस ऑपरेशन को ग्रुप विथ ग्रुप भी कहा जाता है। |
16 | cartesian(otherDataset) जब T और U के डेटासेट पर कॉल किया जाता है, तो (T, U) जोड़े (तत्वों के सभी जोड़े) का एक डेटासेट लौटाता है। |
17 | pipe(command, [envVars]) खोल के माध्यम से RDD के प्रत्येक विभाजन को पाइप करें, उदाहरण के लिए एक पर्ल या बैश स्क्रिप्ट। RDD तत्वों को प्रक्रिया के स्टैडेन को लिखा जाता है और इसके स्टडआउट को लाइनों का आउटपुट स्ट्रिंग्स के RDD के रूप में दिया जाता है। |
18 | coalesce(numPartitions) आरडीडी में विभाजन की संख्या को अंकन में घटाएं। बड़े डेटासेट को फ़िल्टर करने के बाद अधिक कुशलता से संचालन चलाने के लिए उपयोगी। |
19 | repartition(numPartitions) RDD में डेटा को फेरबदल करना या तो अधिक या कम विभाजन बनाने के लिए बेतरतीब ढंग से और उनके बीच इसे संतुलित करना। यह हमेशा नेटवर्क पर सभी डेटा फेरबदल करता है। |
20 | repartitionAndSortWithinPartitions(partitioner) दिए गए विभाजन के अनुसार आरडीडी को पुनर्संरचना करें और प्रत्येक परिणामी विभाजन के भीतर, उनकी कुंजियों द्वारा रिकॉर्ड को सॉर्ट करें। यह रिपर्टिंग को कॉल करने और फिर प्रत्येक विभाजन के भीतर छाँटने की तुलना में अधिक कुशल है क्योंकि यह छँटाई मशीनरी में नीचे की ओर धकेल सकता है। |
कार्रवाई
S.No | क्रिया और अर्थ |
---|---|
1 | reduce(func) किसी फ़ंक्शन का उपयोग करके डेटासेट के तत्वों को अलग करें func(जो दो तर्क लेता है और एक लौटता है)। फ़ंक्शन को कम्यूटेटिव और साहचर्य होना चाहिए ताकि इसे समानांतर में सही ढंग से गणना की जा सके। |
2 | collect() ड्राइवर प्रोग्राम में एक सरणी के रूप में डेटासेट के सभी तत्वों को लौटाता है। यह आमतौर पर एक फिल्टर या अन्य ऑपरेशन के बाद उपयोगी होता है जो डेटा का एक छोटा सा सबसेट लौटाता है। |
3 | count() डेटासेट में तत्वों की संख्या लौटाता है। |
4 | first() डेटासेट का पहला तत्व देता है (लेने के समान) (1)। |
5 | take(n) पहले के साथ एक सरणी देता है n डेटासेट के तत्व। |
6 | takeSample (withReplacement,num, [seed]) एक यादृच्छिक नमूने के साथ एक सरणी देता है num डेटासेट के तत्व, प्रतिस्थापन के साथ या बिना, यादृच्छिक संख्या जनरेटर बीज को वैकल्पिक रूप से पूर्व-निर्दिष्ट करते हैं। |
7 | takeOrdered(n, [ordering]) पहले लौटता है n RDD के तत्व या तो उनके प्राकृतिक क्रम या एक कस्टम तुलनित्र का उपयोग करते हैं। |
8 | saveAsTextFile(path) स्थानीय फ़ाइल सिस्टम, HDFS या किसी अन्य Hadoop- समर्थित फ़ाइल सिस्टम में दी गई निर्देशिका में डेटासेट के तत्वों को एक टेक्स्ट फ़ाइल (या टेक्स्ट फ़ाइलों का सेट) के रूप में लिखता है। स्पार्क कॉल के प्रत्येक तत्व को फ़ाइल में पाठ की एक पंक्ति में बदलने के लिए कॉल करता है। |
9 | saveAsSequenceFile(path) (Java and Scala) स्थानीय फाइल सिस्टम, HDFS या किसी अन्य Hadoop समर्थित फ़ाइल सिस्टम में दिए गए पथ में एक Hadoop SequenceFile के रूप में डेटासेट के तत्वों को लिखता है। यह कुंजी-मूल्य जोड़े के RDD पर उपलब्ध है जो Hadoop के Writable इंटरफ़ेस को कार्यान्वित करता है। स्काला में, यह उन प्रकारों पर भी उपलब्ध है जो अनुमानित रूप से Writable (परिवर्तनीय) में शामिल हैं, जैसे Int, Double, String, आदि जैसे बुनियादी प्रकारों के लिए रूपांतरण। |
10 | saveAsObjectFile(path) (Java and Scala) जावा सीरियलाइज़ेशन का उपयोग करके डेटासेट के तत्वों को एक सरल प्रारूप में लिखा जाता है, जिसे बाद में स्पार्ककोटेक्स्ट.ओबजेक्टफाइल () का उपयोग करके लोड किया जा सकता है। |
1 1 | countByKey() केवल RDD प्रकार (K, V) पर उपलब्ध है। प्रत्येक कुंजी की गिनती के साथ (K, Int) जोड़े का हैशमप लौटाता है। |
12 | foreach(func) एक फ़ंक्शन चलाता है funcडेटासेट के प्रत्येक तत्व पर। यह आमतौर पर साइड इफेक्ट्स के लिए किया जाता है, जैसे कि एक्सुमुलेटर को अपडेट करना या बाहरी स्टोरेज सिस्टम के साथ बातचीत करना। Noteफॉर्च्यूनर () के बाहर Accumulators के अलावा अन्य चर को संशोधित करने से अपरिभाषित व्यवहार हो सकता है। अधिक विवरण के लिए अंडरस्टैंडिंग देखें। |
RDD के साथ प्रोग्रामिंग
एक उदाहरण की मदद से RDD प्रोग्रामिंग में कुछ RDD परिवर्तनों और कार्यों के कार्यान्वयन को देखते हैं।
उदाहरण
एक शब्द गणना उदाहरण पर विचार करें - यह एक दस्तावेज़ में दिखाई देने वाले प्रत्येक शब्द को गिनता है। निम्नलिखित पाठ को एक इनपुट के रूप में देखें और इसे एक के रूप में सहेजा जाता हैinput.txt एक घर निर्देशिका में फ़ाइल।
input.txt - इनपुट फ़ाइल।
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
दिए गए उदाहरण को निष्पादित करने के लिए नीचे दी गई प्रक्रिया का पालन करें।
ओपन स्पार्क-शेल
स्पार्क शेल को खोलने के लिए निम्न कमांड का उपयोग किया जाता है। आमतौर पर, स्पला स्केला का उपयोग करके बनाया जाता है। इसलिए, एक स्पार्क कार्यक्रम स्काला पर्यावरण पर चलता है।
$ spark-shell
यदि स्पार्क शेल सफलतापूर्वक खुलता है तो आपको निम्न आउटपुट मिलेगा। आउटपुट की अंतिम पंक्ति को देखें "स्पार्क संदर्भ एससी के रूप में उपलब्ध है" का अर्थ है कि स्पार्क कंटेनर स्वचालित रूप से नाम के साथ स्पार्क संदर्भ ऑब्जेक्ट बनाया गया हैsc। किसी प्रोग्राम का पहला चरण शुरू करने से पहले, SparkContext ऑब्जेक्ट बनाया जाना चाहिए।
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
एक RDD बनाएँ
सबसे पहले, हमें Spark-Scala API का उपयोग करके इनपुट फ़ाइल को पढ़ना होगा और RDD बनाना होगा।
निम्न कमांड का उपयोग किसी दिए गए स्थान से फाइल पढ़ने के लिए किया जाता है। यहां, नए RDD को इनपुटफाइल के नाम से बनाया गया है। स्ट्रिंग जिसे टेक्स्टफाइल ("") विधि में एक तर्क के रूप में दिया गया है, इनपुट फ़ाइल नाम के लिए निरपेक्ष पथ है। हालांकि, यदि केवल फ़ाइल नाम दिया गया है, तो इसका मतलब है कि इनपुट फ़ाइल वर्तमान स्थान पर है।
scala> val inputfile = sc.textFile("input.txt")
शब्द गणना परिवर्तन निष्पादित करें
हमारा उद्देश्य शब्दों को एक फ़ाइल में गिनना है। प्रत्येक पंक्ति को शब्दों में विभाजित करने के लिए एक फ्लैट नक्शा बनाएं (flatMap(line ⇒ line.split(“ ”))।
अगला, प्रत्येक शब्द को एक मान के साथ एक कुंजी के रूप में पढ़ें ‘1’ (<कुंजी, मूल्य> = <शब्द, 1>) मानचित्र फ़ंक्शन का उपयोग करके (map(word ⇒ (word, 1))।
अंत में, समान कुंजियों का मान जोड़कर उन कुंजियों को कम करें (reduceByKey(_+_))।
शब्द गणना तर्क को निष्पादित करने के लिए निम्नलिखित कमांड का उपयोग किया जाता है। इसे निष्पादित करने के बाद, आपको कोई आउटपुट नहीं मिलेगा क्योंकि यह कोई क्रिया नहीं है, यह एक परिवर्तन है; एक नए आरडीडी को इंगित करना या दिए गए डेटा के साथ क्या करना है, इस पर स्पार्क बताना)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
वर्तमान आरडीडी
RDD के साथ काम करते समय, यदि आप वर्तमान RDD के बारे में जानना चाहते हैं, तो निम्नलिखित कमांड का उपयोग करें। यह आपको डीबगिंग के लिए वर्तमान RDD और उसकी निर्भरता के बारे में विवरण दिखाएगा।
scala> counts.toDebugString
ट्रांसफॉर्मेशन कैशिंग
आप उस पर लगातार () या कैश () विधियों का उपयोग करके एक RDD को चिह्नित किया जा सकता है। पहली बार इसकी गणना एक कार्रवाई में की जाती है, इसे नोड्स पर स्मृति में रखा जाएगा। स्मृति में मध्यवर्ती परिवर्तनों को संग्रहीत करने के लिए निम्न आदेश का उपयोग करें।
scala> counts.cache()
क्रिया को लागू करना
एक क्रिया को लागू करना, जैसे सभी परिवर्तनों को संग्रहीत करना, एक पाठ फ़ाइल में परिणाम। SaveAsTextFile ("") विधि के लिए स्ट्रिंग तर्क आउटपुट फ़ोल्डर का पूर्ण पथ है। पाठ फ़ाइल में आउटपुट सहेजने के लिए निम्न कमांड का प्रयास करें। निम्नलिखित उदाहरण में, 'आउटपुट' फ़ोल्डर चालू स्थान पर है।
scala> counts.saveAsTextFile("output")
आउटपुट जाँच रहा है
होम डायरेक्टरी में जाने के लिए एक और टर्मिनल खोलें (जहां दूसरे टर्मिनल में स्पार्क निष्पादित होता है)। आउटपुट निर्देशिका की जाँच के लिए निम्न आदेशों का उपयोग करें।
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
निम्न कमांड का उपयोग आउटपुट को देखने के लिए किया जाता है Part-00000 फ़ाइलें।
[hadoop@localhost output]$ cat part-00000
उत्पादन
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
निम्न कमांड का उपयोग आउटपुट को देखने के लिए किया जाता है Part-00001 फ़ाइलें।
[hadoop@localhost output]$ cat part-00001
उत्पादन
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
संयुक्त राष्ट्र ने भंडारण को जारी रखा
UN-persisting से पहले, यदि आप इस एप्लिकेशन के लिए उपयोग किए जाने वाले संग्रहण स्थान को देखना चाहते हैं, तो अपने ब्राउज़र में निम्न URL का उपयोग करें।
http://localhost:4040
आपको निम्न स्क्रीन दिखाई देगी, जो एप्लिकेशन के लिए उपयोग किए जाने वाले संग्रहण स्थान को दिखाती है, जो स्पार्क शेल पर चल रहे हैं।
यदि आप विशेष RDD के संग्रहण स्थान को UN- जारी रखना चाहते हैं, तो निम्न कमांड का उपयोग करें।
Scala> counts.unpersist()
आपको आउटपुट निम्नानुसार दिखाई देगा -
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
ब्राउज़र में भंडारण स्थान की पुष्टि करने के लिए, निम्न URL का उपयोग करें।
http://localhost:4040/
आपको निम्न स्क्रीन दिखाई देगी। यह एप्लिकेशन के लिए उपयोग किए जाने वाले संग्रहण स्थान को दिखाता है, जो स्पार्क शेल पर चल रहे हैं।
स्पार्क एप्लिकेशन, स्पार्क-सबमिट का उपयोग करते हुए, एक शेल कमांड है जिसका उपयोग स्पार्क एप्लिकेशन को एक क्लस्टर पर तैनात करने के लिए किया जाता है। यह एक समान इंटरफ़ेस के माध्यम से सभी संबंधित क्लस्टर प्रबंधकों का उपयोग करता है। इसलिए, आपको प्रत्येक के लिए अपने एप्लिकेशन को कॉन्फ़िगर करने की आवश्यकता नहीं है।
उदाहरण
आइए हम शब्द गणना का एक ही उदाहरण लेते हैं, हमने शेल कमांड का उपयोग करते हुए पहले उपयोग किया था। यहां, हम उसी उदाहरण को स्पार्क एप्लिकेशन के रूप में मानते हैं।
नमूना इनपुट
निम्न पाठ इनपुट डेटा है और नाम वाली फ़ाइल है in.txt।
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
निम्नलिखित कार्यक्रम को देखो -
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
उपरोक्त प्रोग्राम को एक फ़ाइल में सेव करें जिसका नाम है SparkWordCount.scala और इसे नामित उपयोगकर्ता-निर्धारित निर्देशिका में रखें spark-application।
Note - InputRDD को countRDD में रूपांतरित करते समय, हम फ़्लैट मैप () का उपयोग लाइनों के लिए (टेक्स्ट फ़ाइल से) शब्दों में, शब्द आवृत्ति की गणना के लिए मानचित्र () विधि और प्रत्येक पुनरावृत्ति की गणना के लिएBeyKey () विधि का उपयोग कर रहे हैं।
इस एप्लिकेशन को सबमिट करने के लिए निम्न चरणों का उपयोग करें। में सभी चरणों का निष्पादन करेंspark-application टर्मिनल के माध्यम से निर्देशिका।
चरण 1: स्पार्क जा को डाउनलोड करें
संकलन के लिए स्पार्क कोर जार आवश्यक है, इसलिए, स्पार्क-कोर_2.10-1.3.0. jar को निम्न लिंक से डाउनलोड करें स्पार्क कोर जार और डाउनलोड निर्देशिका से जार फ़ाइल को स्थानांतरित करेंspark-application निर्देशिका।
चरण 2: संकलन कार्यक्रम
नीचे दिए गए आदेश का उपयोग करके उपरोक्त कार्यक्रम को संकलित करें। इस कमांड को स्पार्क-एप्लिकेशन डायरेक्टरी से निष्पादित किया जाना चाहिए। यहाँ,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar स्पार्क लाइब्रेरी से लिया गया हैडॉप सपोर्ट जार है।
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
चरण 3: एक जार बनाएँ
निम्नलिखित कमांड का उपयोग करके स्पार्क एप्लिकेशन की जार फ़ाइल बनाएं। यहाँ,wordcount जार फ़ाइल के लिए फ़ाइल का नाम है।
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
चरण 4: स्पार्क एप्लिकेशन सबमिट करें
निम्नलिखित कमांड का उपयोग करके स्पार्क एप्लिकेशन सबमिट करें -
spark-submit --class SparkWordCount --master local wordcount.jar
यदि इसे सफलतापूर्वक निष्पादित किया जाता है, तो आपको नीचे दिए गए आउटपुट मिलेंगे। OKनिम्नलिखित आउटपुट में देना उपयोगकर्ता की पहचान के लिए है और यह प्रोग्राम की अंतिम पंक्ति है। यदि आप निम्नलिखित आउटपुट को ध्यान से पढ़ें, तो आपको अलग-अलग चीजें मिलेंगी, जैसे कि -
- पोर्ट 42954 पर सफलतापूर्वक सेवा 'स्पार्कड्राइवर' शुरू की
- मेमोरीस्टोर की शुरुआत 267.3 एमबी की क्षमता के साथ हुई
- Http://192.168.1.217:4040 पर स्पार्कयूआई शुरू किया
- जोड़ा गया JAR फ़ाइल: /home/hadoop/piapplication/count.jar
- परिणाम 1 (SaveAsTextFile पर SparkPi.scala: 11) 0.566 सेकेंड में समाप्त हुआ
- Http://192.168.1.217:4040 पर स्पार्क वेब UI बंद कर दिया
- मैमोरीस्टोर को मंजूरी दे दी
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
चरण 5: आउटपुट की जाँच करना
कार्यक्रम के सफल निष्पादन के बाद, आपको निर्देशिका नाम मिलेगा outfile स्पार्क-एप्लिकेशन डायरेक्टरी में।
निम्न आदेशों का उपयोग संगठन निर्देशिका में फ़ाइलों की सूची को खोलने और जांचने के लिए किया जाता है।
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
में आउटपुट की जाँच के लिए आदेश part-00000 फ़ाइल हैं -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
भाग -00001 फ़ाइल में आउटपुट की जाँच के आदेश हैं -
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
'स्पार्क-सबमिट' कमांड के बारे में अधिक जानने के लिए निम्न अनुभाग पर जाएं।
स्पार्क-सबमिट सिंटैक्स
spark-submit [options] <app jar | python file> [app arguments]
विकल्प
S.No | विकल्प | विवरण |
---|---|---|
1 | --गुरुजी | स्पार्क: // होस्ट: पोर्ट, मेसोस: // होस्ट: पोर्ट, यार्न या स्थानीय। |
2 | --deploy मोड | चाहे ड्राइवर प्रोग्राम को स्थानीय रूप से लॉन्च करना हो ("क्लाइंट") या क्लस्टर ("क्लस्टर") के अंदर कार्यकर्ता मशीनों में से एक पर (डिफ़ॉल्ट: क्लाइंट)। |
3 | --कक्षा | आपके एप्लिकेशन का मुख्य वर्ग (जावा / स्काला ऐप्स के लिए)। |
4 | --name | आपके आवेदन का एक नाम। |
5 | --jars | चालक और निष्पादक वर्गपथ पर शामिल करने के लिए स्थानीय जार की कोम्मा-अलग सूची। |
6 | --packages | चालक और निष्पादक वर्गपथ पर शामिल करने के लिए जार के मावेन निर्देशांक की कोमा-पृथक सूची। |
7 | --repositories | अतिरिक्त दूरस्थ रिपॉजिटरी की कोमा से अलग की गई सूची - पैकटों के साथ दिए गए मावेन निर्देशांक की खोज करने के लिए। |
8 | --py-फ़ाइलें | पाइथन ऐप्स के लिए PYTHON PATH पर जगह पाने के लिए कोमा-अलग की सूची .zip, .egg, या .py फाइलें। |
9 | --files | प्रत्येक निष्पादक की कार्यशील निर्देशिका में रखी जाने वाली फाइलों की कोमा से अलग सूची। |
10 | --conf (प्रोप = वैल) | मनमाना स्पार्क विन्यास संपत्ति। |
1 1 | --properties-फ़ाइल | एक फ़ाइल का पथ जिसमें से अतिरिक्त गुण लोड करना है। यदि निर्दिष्ट नहीं है, तो यह conf / स्पार्क-चूक के लिए दिखेगा। |
12 | --driver स्मृति | ड्राइवर के लिए मेमोरी (जैसे 1000M, 2G) (डिफ़ॉल्ट: 512M)। |
13 | --driver-जावा-विकल्प | ड्राइवर को पास करने के लिए अतिरिक्त जावा विकल्प। |
14 | --driver-पुस्तकालय-पथ | ड्राइवर को पास करने के लिए अतिरिक्त लाइब्रेरी पथ प्रविष्टियाँ। |
15 | --driver स्तरीय पथ | ड्राइवर को पास करने के लिए अतिरिक्त क्लास पथ प्रविष्टियाँ। ध्यान दें कि - जार के साथ जोड़े गए जार स्वचालित रूप से क्लासपाथ में शामिल हैं। |
16 | --executor स्मृति | प्रति निष्पादन मेमोरी (जैसे 1000M, 2G) (डिफ़ॉल्ट: 1G)। |
17 | --proxy-उपयोगकर्ता | एप्लिकेशन सबमिट करते समय उपयोगकर्ता को प्रतिरूपण करने के लिए। |
18 | -हेल्प, -ह | यह सहायता संदेश दिखाएं और बाहर निकलें। |
19 | --वरबोस, -v | अतिरिक्त डिबग आउटपुट प्रिंट करें। |
20 | --version | वर्तमान स्पार्क के संस्करण को प्रिंट करें। |
21 | --driver-cores NUM | ड्राइवर के लिए कोर (डिफ़ॉल्ट: 1)। |
22 | --supervise | यदि दिया गया है, तो विफलता पर ड्राइवर को पुनरारंभ करें। |
23 | --kill | यदि दिया जाता है, तो निर्दिष्ट ड्राइवर को मारता है। |
24 | --स्थिति | यदि दिया गया है, तो निर्दिष्ट ड्राइवर की स्थिति का अनुरोध करता है। |
25 | --total-निष्पादक-कोर | सभी निष्पादकों के लिए कुल कोर। |
26 | --executor-कोर | प्रति निष्पादन कोर की संख्या। (डिफ़ॉल्ट: YARN मोड में 1, या स्टैंडअलोन मोड में कार्यकर्ता पर सभी उपलब्ध कोर)। |
स्पार्क में दो भिन्न प्रकार के साझा चर होते हैं - एक है broadcast variables और दूसरा है accumulators।
Broadcast variables - कुशलता से इस्तेमाल किया, बड़े मूल्यों को वितरित करने के लिए।
Accumulators - विशेष संग्रह की जानकारी एकत्र करने के लिए उपयोग किया जाता है।
प्रसारण चर
ब्रॉडकास्ट वेरिएबल प्रोग्रामर को प्रत्येक मशीन पर रीड-ओनली वैरिएबल कैश्ड रखने की अनुमति देता है, न कि कार्यों के साथ इसे कॉपी करने के बजाय। उनका उपयोग किया जा सकता है, उदाहरण के लिए, प्रत्येक नोड को एक बड़े इनपुट डेटासेट की एक प्रतिलिपि, एक कुशल तरीके से देने के लिए। स्पार्क संचार लागत को कम करने के लिए कुशल प्रसारण एल्गोरिदम का उपयोग करके प्रसारण चर वितरित करने का भी प्रयास करता है।
स्पार्क क्रियाओं को चरणों के एक सेट के माध्यम से निष्पादित किया जाता है, वितरित "फेरबदल" संचालन द्वारा अलग किया जाता है। स्पार्क स्वचालित रूप से प्रत्येक चरण के भीतर कार्यों द्वारा आवश्यक सामान्य डेटा को प्रसारित करता है।
इस तरह प्रसारित किए गए डेटा को क्रमबद्ध रूप में कैश किया जाता है और प्रत्येक कार्य को चलाने से पहले इसे डीरिशियल किया जाता है। इसका मतलब है कि स्पष्ट रूप से प्रसारण चर बनाने, केवल तब उपयोगी होता है जब कई चरणों में कार्यों को एक ही डेटा की आवश्यकता होती है या जब डेटा को deserialized रूप में कैशिंग करना महत्वपूर्ण होता है।
प्रसारण चर एक चर से बनाए जाते हैं v फोन करके SparkContext.broadcast(v)। प्रसारण चर चारों ओर एक आवरण हैv, और इसके मूल्य पर कॉल करके पहुँचा जा सकता है valueतरीका। नीचे दिया गया कोड यह दिखाता है -
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
Output -
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
प्रसारण चर बनाए जाने के बाद, इसका उपयोग मूल्य के बजाय किया जाना चाहिए v किसी भी कार्य में क्लस्टर पर चलता है, ताकि vनोड्स पर एक से अधिक बार शिप नहीं किया गया है। इसके अलावा, वस्तुv इसके प्रसारण के बाद संशोधित नहीं किया जाना चाहिए, ताकि यह सुनिश्चित हो सके कि सभी नोड्स को प्रसारण चर का समान मूल्य मिले।
एक्युमुलेटरों
Accumulators चर हैं जो केवल एक सहयोगी ऑपरेशन के माध्यम से "जोड़े" जाते हैं और इसलिए, समानांतर में कुशलता से समर्थित हो सकते हैं। उनका उपयोग काउंटरर्स को लागू करने के लिए किया जा सकता है (जैसा कि MapReduce में) या रकम। स्पार्क मूल रूप से संख्यात्मक प्रकारों के संचायक का समर्थन करता है, और प्रोग्रामर नए प्रकारों के लिए समर्थन जोड़ सकते हैं। यदि संचायक एक नाम के साथ बनाए जाते हैं, तो उन्हें प्रदर्शित किया जाएगाSpark’s UI। यह चल रहे चरणों की प्रगति को समझने के लिए उपयोगी हो सकता है (नोट - यह अभी तक पायथन में समर्थित नहीं है)।
प्रारंभिक मान से एक संचायक बनाया जाता है v फोन करके SparkContext.accumulator(v)। इसके बाद क्लस्टर पर चलने वाले टास्क को इसमें जोड़ा जा सकता हैaddविधि या + = ऑपरेटर (स्काला और पायथन में)। हालाँकि, वे इसके मूल्य को नहीं पढ़ सकते हैं। केवल चालक कार्यक्रम संचायक के मूल्य को पढ़ सकता है, इसके उपयोग सेvalue तरीका।
नीचे दिया गया कोड एक संचयकर्ता को दिखाता है जिसका उपयोग सरणी के तत्वों को जोड़ने के लिए किया जा रहा है -
scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
यदि आप उपरोक्त कोड का आउटपुट देखना चाहते हैं तो निम्न कमांड का उपयोग करें -
scala> accum.value
उत्पादन
res2: Int = 10
न्यूमेरिक आरडीडी संचालन
स्पार्क आपको पूर्वनिर्धारित एपीआई विधियों में से एक का उपयोग करके, संख्यात्मक डेटा पर विभिन्न ऑपरेशन करने की अनुमति देता है। स्पार्क के संख्यात्मक संचालन को एक स्ट्रीमिंग एल्गोरिथ्म के साथ लागू किया जाता है जो एक समय में मॉडल, एक तत्व के निर्माण की अनुमति देता है।
इन ऑपरेशनों की गणना की जाती है और एक के रूप में लौटाया जाता है StatusCounter बुलाकर वस्तु status() तरीका।
S.No | तरीके और अर्थ |
---|---|
1 | count() RDD में तत्वों की संख्या। |
2 | Mean() RDD में तत्वों का औसत। |
3 | Sum() RDD में तत्वों का कुल मूल्य। |
4 | Max() RDD में सभी तत्वों के बीच अधिकतम मूल्य। |
5 | Min() RDD में सभी तत्वों के बीच न्यूनतम मूल्य। |
6 | Variance() तत्वों की भिन्नता। |
7 | Stdev() मानक विचलन। |
यदि आप इन विधियों में से केवल एक का उपयोग करना चाहते हैं, तो आप सीधे RDD पर संबंधित विधि को कॉल कर सकते हैं।