अपाचे स्पार्क - तैनाती

स्पार्क एप्लिकेशन, स्पार्क-सबमिट का उपयोग करते हुए, एक शेल कमांड है जिसका उपयोग स्पार्क एप्लिकेशन को एक क्लस्टर पर तैनात करने के लिए किया जाता है। यह एक समान इंटरफ़ेस के माध्यम से सभी संबंधित क्लस्टर प्रबंधकों का उपयोग करता है। इसलिए, आपको प्रत्येक के लिए अपने एप्लिकेशन को कॉन्फ़िगर करने की आवश्यकता नहीं है।

उदाहरण

आइए हम शब्द गणना का एक ही उदाहरण लेते हैं, हमने शेल कमांड का उपयोग करते हुए पहले उपयोग किया था। यहां, हम उसी उदाहरण को स्पार्क एप्लिकेशन के रूप में मानते हैं।

नमूना इनपुट

निम्न पाठ इनपुट डेटा है और नाम वाली फ़ाइल है in.txt

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

निम्नलिखित कार्यक्रम को देखो -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

उपरोक्त प्रोग्राम को एक फ़ाइल में सेव करें जिसका नाम है SparkWordCount.scala और इसे नामित उपयोगकर्ता-निर्धारित निर्देशिका में रखें spark-application

Note - InputRDD को countRDD में रूपांतरित करते समय, हम फ़्लैट मैप () का उपयोग लाइनों के लिए (टेक्स्ट फ़ाइल से) शब्दों में, शब्द आवृत्ति की गणना के लिए मानचित्र () विधि और प्रत्येक पुनरावृत्ति की गणना के लिएBeyKey () विधि का उपयोग कर रहे हैं।

इस एप्लिकेशन को सबमिट करने के लिए निम्न चरणों का उपयोग करें। में सभी चरणों का निष्पादन करेंspark-application टर्मिनल के माध्यम से निर्देशिका।

चरण 1: स्पार्क जा को डाउनलोड करें

संकलन के लिए स्पार्क कोर जार आवश्यक है, इसलिए, स्पार्क-कोर_2.10-1.3.0. jar को निम्न लिंक से डाउनलोड करें स्पार्क कोर जार और डाउनलोड निर्देशिका से जार फ़ाइल को स्थानांतरित करेंspark-application निर्देशिका।

चरण 2: संकलन कार्यक्रम

नीचे दिए गए आदेश का उपयोग करके उपरोक्त कार्यक्रम को संकलित करें। इस कमांड को स्पार्क-एप्लिकेशन डायरेक्टरी से निष्पादित किया जाना चाहिए। यहाँ,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar स्पार्क लाइब्रेरी से लिया गया हैडॉप सपोर्ट जार है।

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

चरण 3: एक जार बनाएँ

निम्नलिखित कमांड का उपयोग करके स्पार्क एप्लिकेशन की जार फ़ाइल बनाएं। यहाँ,wordcount जार फ़ाइल के लिए फ़ाइल का नाम है।

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

चरण 4: स्पार्क एप्लिकेशन सबमिट करें

निम्नलिखित कमांड का उपयोग करके स्पार्क एप्लिकेशन सबमिट करें -

spark-submit --class SparkWordCount --master local wordcount.jar

यदि इसे सफलतापूर्वक निष्पादित किया जाता है, तो आपको नीचे दिए गए आउटपुट मिलेंगे। OKनिम्नलिखित आउटपुट में देना उपयोगकर्ता की पहचान के लिए है और यह प्रोग्राम की अंतिम पंक्ति है। यदि आप निम्नलिखित आउटपुट को ध्यान से पढ़ें, तो आपको अलग-अलग चीजें मिलेंगी, जैसे कि -

  • पोर्ट 42954 पर सफलतापूर्वक सेवा 'स्पार्कड्राइवर' शुरू की
  • मेमोरीस्टोर की शुरुआत 267.3 एमबी की क्षमता के साथ हुई
  • Http://192.168.1.217:4040 पर स्पार्कयूआई शुरू किया
  • जोड़ा गया JAR फ़ाइल: /home/hadoop/piapplication/count.jar
  • परिणाम 1 (SaveAsTextFile पर SparkPi.scala: 11) 0.566 सेकेंड में समाप्त हुआ
  • Http://192.168.1.217:4040 पर स्पार्क वेब UI बंद कर दिया
  • मैमोरीस्टोर को मंजूरी दे दी
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

चरण 5: आउटपुट की जाँच करना

कार्यक्रम के सफल निष्पादन के बाद, आपको निर्देशिका नाम मिलेगा outfile स्पार्क-एप्लिकेशन डायरेक्टरी में।

निम्न आदेशों का उपयोग संगठन निर्देशिका में फ़ाइलों की सूची को खोलने और जांचने के लिए किया जाता है।

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

में आउटपुट की जाँच के लिए आदेश part-00000 फ़ाइल हैं -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

भाग -00001 फ़ाइल में आउटपुट की जाँच के आदेश हैं -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

'स्पार्क-सबमिट' कमांड के बारे में अधिक जानने के लिए निम्न अनुभाग पर जाएं।

स्पार्क-सबमिट सिंटैक्स

spark-submit [options] <app jar | python file> [app arguments]

विकल्प

S.No विकल्प विवरण
1 --गुरुजी स्पार्क: // होस्ट: पोर्ट, मेसोस: // होस्ट: पोर्ट, यार्न या स्थानीय।
2 --deploy मोड चाहे ड्राइवर प्रोग्राम को स्थानीय रूप से लॉन्च करना हो ("क्लाइंट") या क्लस्टर ("क्लस्टर") के अंदर कार्यकर्ता मशीनों में से एक पर (डिफ़ॉल्ट: क्लाइंट)।
3 --कक्षा आपके एप्लिकेशन का मुख्य वर्ग (जावा / स्काला ऐप्स के लिए)।
4 --name आपके आवेदन का एक नाम।
5 --jars चालक और निष्पादक वर्गपथ पर शामिल करने के लिए स्थानीय जार की कोम्मा-अलग सूची।
6 --packages चालक और निष्पादक वर्गपथ पर शामिल करने के लिए जार के मावेन निर्देशांक की कोमा-पृथक सूची।
7 --repositories अतिरिक्त दूरस्थ रिपॉजिटरी की कोमा से अलग की गई सूची - पैकटों के साथ दिए गए मावेन निर्देशांक की खोज करने के लिए।
8 --py-फ़ाइलें पाइथन ऐप्स के लिए PYTHON PATH पर जगह पाने के लिए कोमा-अलग की सूची .zip, .egg, या .py फाइलें।
9 --files प्रत्येक निष्पादक की कार्यशील निर्देशिका में रखी जाने वाली फाइलों की कोमा से अलग सूची।
10 --conf (प्रोप = वैल) मनमाना स्पार्क विन्यास संपत्ति।
1 1 --properties-फ़ाइल एक फ़ाइल का पथ जिसमें से अतिरिक्त गुण लोड करना है। यदि निर्दिष्ट नहीं है, तो यह conf / स्पार्क-चूक के लिए दिखेगा।
12 --driver स्मृति ड्राइवर के लिए मेमोरी (जैसे 1000M, 2G) (डिफ़ॉल्ट: 512M)।
13 --driver-जावा-विकल्प ड्राइवर को पास करने के लिए अतिरिक्त जावा विकल्प।
14 --driver-पुस्तकालय-पथ ड्राइवर को पास करने के लिए अतिरिक्त लाइब्रेरी पथ प्रविष्टियाँ।
15 --driver स्तरीय पथ

ड्राइवर को पास करने के लिए अतिरिक्त क्लास पथ प्रविष्टियाँ।

ध्यान दें कि - जार के साथ जोड़े गए जार स्वचालित रूप से क्लासपाथ में शामिल हैं।

16 --executor स्मृति प्रति निष्पादन मेमोरी (जैसे 1000M, 2G) (डिफ़ॉल्ट: 1G)।
17 --proxy-उपयोगकर्ता एप्लिकेशन सबमिट करते समय उपयोगकर्ता को प्रतिरूपण करने के लिए।
18 -हेल्प, -ह यह सहायता संदेश दिखाएं और बाहर निकलें।
19 --वरबोस, -v अतिरिक्त डिबग आउटपुट प्रिंट करें।
20 --version वर्तमान स्पार्क के संस्करण को प्रिंट करें।
21 --driver-cores NUM ड्राइवर के लिए कोर (डिफ़ॉल्ट: 1)।
22 --supervise यदि दिया गया है, तो विफलता पर ड्राइवर को पुनरारंभ करें।
23 --kill यदि दिया जाता है, तो निर्दिष्ट ड्राइवर को मारता है।
24 --स्थिति यदि दिया गया है, तो निर्दिष्ट ड्राइवर की स्थिति का अनुरोध करता है।
25 --total-निष्पादक-कोर सभी निष्पादकों के लिए कुल कोर।
26 --executor-कोर प्रति निष्पादन कोर की संख्या। (डिफ़ॉल्ट: YARN मोड में 1, या स्टैंडअलोन मोड में कार्यकर्ता पर सभी उपलब्ध कोर)।