PySpark - त्वरित गाइड

इस अध्याय में, हम खुद को परिचित करेंगे कि अपाचे स्पार्क क्या है और पाइस्पार्क को कैसे विकसित किया गया था।

स्पार्क - अवलोकन

अपाचे स्पार्क एक लाइटनिंग फास्ट रियल-टाइम प्रोसेसिंग फ्रेमवर्क है। यह वास्तविक समय में डेटा का विश्लेषण करने के लिए स्मृति संगणना करता है। यह तस्वीर के रूप में आया थाApache Hadoop MapReduceकेवल बैच प्रसंस्करण प्रदर्शन कर रहा था और वास्तविक समय प्रसंस्करण सुविधा का अभाव था। इसलिए, अपाचे स्पार्क को पेश किया गया क्योंकि यह वास्तविक समय में स्ट्रीम प्रोसेसिंग कर सकता है और बैच प्रोसेसिंग का भी ध्यान रख सकता है।

रियल-टाइम और बैच प्रोसेसिंग के अलावा, अपाचे स्पार्क इंटरएक्टिव प्रश्नों और पुनरावृत्ति एल्गोरिदम का भी समर्थन करता है। अपाचे स्पार्क का अपना क्लस्टर मैनेजर है, जहां वह अपने एप्लिकेशन को होस्ट कर सकता है। यह स्टोरेज और प्रोसेसिंग दोनों के लिए Apache Hadoop का लाभ उठाता है। यह उपयोगकर्ता हैHDFS (Hadoop Distributed File System) स्टोरेज के लिए और इस पर स्पार्क एप्लिकेशन चला सकते हैं YARN भी।

PySpark - अवलोकन

अपाचे स्पार्क में लिखा है Scala programming language। स्पार्क के साथ पायथन का समर्थन करने के लिए, अपाचे स्पार्क समुदाय ने एक उपकरण, पायस्पार्क जारी किया। PySpark का उपयोग करके, आप के साथ काम कर सकते हैंRDDsपायथन प्रोग्रामिंग भाषा में भी। यह एक पुस्तकालय की वजह से कहा जाता हैPy4j कि वे इसे प्राप्त करने में सक्षम हैं।

PySpark प्रदान करता है PySpark Shellजो पायथन एपीआई को स्पार्क कोर से जोड़ता है और स्पार्क संदर्भ को इनिशियलाइज़ करता है। डेटा वैज्ञानिकों और एनालिटिक्स के अधिकांश विशेषज्ञ आज अपने समृद्ध पुस्तकालय सेट के कारण अजगर का उपयोग करते हैं। स्पार्क के साथ अजगर को एकीकृत करना उनके लिए एक वरदान है।

इस अध्याय में, हम PySpark के पर्यावरण सेटअप को समझेंगे।

Note - यह विचार कर रहा है कि आपके कंप्यूटर पर जावा और स्काला स्थापित है।

अब हम निम्नलिखित चरणों के साथ PySpark को डाउनलोड और सेट करते हैं।

Step 1- आधिकारिक अपाचे स्पार्क डाउनलोड पेज पर जाएं और वहां उपलब्ध अपाचे स्पार्क के नवीनतम संस्करण को डाउनलोड करें। इस ट्यूटोरियल में, हम उपयोग कर रहे हैंspark-2.1.0-bin-hadoop2.7

Step 2- अब, डाउनलोड की गई स्पार्क टार फ़ाइल को निकालें। डिफ़ॉल्ट रूप से, यह डाउनलोड निर्देशिका में डाउनलोड हो जाएगा।

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

यह एक डायरेक्टरी बनाएगा spark-2.1.0-bin-hadoop2.7। PySpark शुरू करने से पहले, आपको स्पार्क पथ और को सेट करने के लिए निम्न वातावरण सेट करना होगाPy4j path

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

या, उपरोक्त वातावरण को विश्व स्तर पर स्थापित करने के लिए, उन्हें अंदर रखें .bashrc file। फिर काम करने के लिए वातावरण के लिए निम्न आदेश चलाएँ।

# source .bashrc

अब जब हमारे पास सभी वातावरण सेट हैं, तो हम स्पार्क डायरेक्टरी में जाते हैं और निम्नलिखित कमांड को चलाकर PySpark शेल को आमंत्रित करते हैं -

# ./bin/pyspark

इससे आपका PySpark खोल शुरू हो जाएगा।

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

SparkContext किसी भी स्पार्क कार्यक्षमता के लिए प्रवेश बिंदु है। जब हम कोई स्पार्क एप्लिकेशन चलाते हैं, तो एक ड्राइवर प्रोग्राम शुरू होता है, जिसमें मुख्य कार्य होता है और आपका स्पार्क कॉन्टेक्स्ट यहां शुरू हो जाता है। चालक कार्यक्रम तब कर्मी नोड्स पर निष्पादकों के अंदर संचालन चलाता है।

SparkContext लॉन्च करने के लिए Py4J का उपयोग करता है JVM और एक बनाता है JavaSparkContext। डिफ़ॉल्ट रूप से, PySpark में SparkContext उपलब्ध है‘sc’, इसलिए एक नया SparkContext बनाने से काम नहीं चलेगा।

निम्नलिखित कोड ब्लॉक में एक PySpark वर्ग और मापदंडों का विवरण है, जो एक SparkContext ले सकता है।

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

मापदंडों

निम्नलिखित एक स्पार्ककनेक्ट के पैरामीटर हैं।

  • Master - यह उस क्लस्टर का URL है जिसे वह जोड़ता है।

  • appName - आपकी नौकरी का नाम।

  • sparkHome - स्पार्क इंस्टॉलेशन डायरेक्टरी।

  • pyFiles - .zip या .py फाइलें क्लस्टर में भेजने के लिए और PYTHONPATH में जोड़ें।

  • Environment - कार्यकर्ता पर्यावरण चर को नोड करता है।

  • batchSize- पायथन ऑब्जेक्ट्स की संख्या को एकल जावा ऑब्जेक्ट के रूप में दर्शाया गया है। बैचिंग को अक्षम करने के लिए 1 सेट करें, ऑब्जेक्ट आकार के आधार पर बैच आकार को स्वचालित रूप से चुनने के लिए 0 या असीमित बैच आकार का उपयोग करने के लिए -1।

  • Serializer - आरडीडी धारावाहिक।

  • Conf - सभी स्पार्क गुणों को निर्धारित करने के लिए L {SparkConf} की एक वस्तु।

  • Gateway - एक मौजूदा गेटवे और JVM का उपयोग करें, अन्यथा एक नए JVM को इनिशियलाइज़ करें।

  • JSC - JavaSparkContext उदाहरण।

  • profiler_cls - कस्टम प्रोफाइलर का एक वर्ग प्रोफाइलिंग करता था (डिफ़ॉल्ट pyspark.profiler.BasicProfiler है)।

उपरोक्त मापदंडों के बीच, master तथा appnameज्यादातर उपयोग किया जाता है। किसी भी PySpark कार्यक्रम की पहली दो पंक्तियाँ नीचे दी गई हैं -

from pyspark import SparkContext
sc = SparkContext("local", "First App")

स्पार्ककनेक्ट उदाहरण - पाइस्पार्क शेल

अब जब आप SparkContext के बारे में पर्याप्त जानते हैं, तो आइए PySpark शेल पर एक सरल उदाहरण चलाते हैं। इस उदाहरण में, हम वर्णों की संख्या को 'a' या 'b' के साथ गिनेंगेREADME.mdफ़ाइल। तो, हम कहते हैं कि अगर किसी फ़ाइल में 5 लाइनें हैं और 3 लाइनों में वर्ण 'a' है, तो आउटपुट → होगाLine with a: 3। चरित्र 'बी' के लिए भी ऐसा ही किया जाएगा।

Note- हम निम्नलिखित उदाहरण में कोई स्पार्ककॉन्टेक्ट ऑब्जेक्ट नहीं बना रहे हैं क्योंकि डिफ़ॉल्ट रूप से, स्पार्क स्वचालित रूप से स्पार्ककॉन्टेक्ट ऑब्जेक्ट को sc नाम देता है, जब PySpark शेल शुरू होता है। यदि आप एक और स्पार्क कॉन्टेक्स्ट ऑब्जेक्ट बनाने की कोशिश करते हैं, तो आपको निम्न त्रुटि मिलेगी -"ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

स्पार्क कॉन्टेक्स्ट उदाहरण - पायथन प्रोग्राम

आइए हम पायथन प्रोग्राम का उपयोग करके एक ही उदाहरण चलाते हैं। एक पायथन फ़ाइल बनाएँ जिसे कहा जाता हैfirstapp.py और उस फ़ाइल में निम्न कोड दर्ज करें।

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

तब हम इस पायथन फ़ाइल को चलाने के लिए टर्मिनल में निम्नलिखित कमांड निष्पादित करेंगे। हमें ऊपर जैसा ही आउटपुट मिलेगा।

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

अब जब हमने अपने सिस्टम पर PySpark को स्थापित और कॉन्फ़िगर किया है, तो हम Apache Spark पर Python में प्रोग्राम कर सकते हैं। हालांकि ऐसा करने से पहले, आइए स्पार्क - आरडीडी में एक बुनियादी अवधारणा को समझें।

RDD का अर्थ है Resilient Distributed Dataset, ये एक क्लस्टर पर समानांतर प्रसंस्करण करने के लिए कई नोड्स पर चलने और संचालित करने वाले तत्व हैं। RDD अपरिवर्तनीय तत्व हैं, जिसका अर्थ है कि एक बार RDD बनाने के बाद आप इसे बदल नहीं सकते हैं। RDD गलत सहिष्णु होते हैं, इसलिए किसी भी विफलता के मामले में, वे अपने आप ठीक हो जाते हैं। आप एक निश्चित कार्य को प्राप्त करने के लिए इन RDDs पर कई ऑपरेशन लागू कर सकते हैं।

इन RDD पर परिचालन लागू करने के लिए, दो तरीके हैं -

  • परिवर्तन और
  • Action

आइए इन दो तरीकों को विस्तार से समझते हैं।

Transformation- ये ऑपरेशन हैं, जो RDD पर नए RDD बनाने के लिए लगाए जाते हैं। फ़िल्टर, GroupBy और मानचित्र परिवर्तनों के उदाहरण हैं।

Action - ये वे ऑपरेशन हैं जो आरडीडी पर लागू होते हैं, जो स्पार्क को संगणना करने और परिणाम को वापस चालक को भेजने का निर्देश देता है।

PySpark में किसी भी ऑपरेशन को लागू करने के लिए, हमें एक बनाने की आवश्यकता है PySpark RDDप्रथम। निम्नलिखित कोड ब्लॉक में एक PySpark RDD क्लास का विवरण है -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

आइए देखते हैं कि PySpark का उपयोग करके कुछ बुनियादी ऑपरेशन कैसे चलाएं। पायथन फ़ाइल में निम्न कोड RDD शब्द बनाता है, जो उल्लिखित शब्दों के एक समूह को संग्रहीत करता है।

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

अब हम शब्दों पर कुछ ऑपरेशन चलाएंगे।

गिनती ()

RDD में तत्वों की संख्या वापस आ गई है।

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - गिनती के लिए कमांड () है -

$SPARK_HOME/bin/spark-submit count.py

Output - उपरोक्त कमांड के लिए आउटपुट है -

Number of elements in RDD → 8

इकट्ठा ()

RDD में सभी तत्व वापस आ गए हैं।

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - संग्रह के लिए कमांड () है -

$SPARK_HOME/bin/spark-submit collect.py

Output - उपरोक्त कमांड के लिए आउटपुट है -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (च)

केवल उन्हीं तत्वों को लौटाता है जो फ़ॉर्चे के अंदर फ़ंक्शन की स्थिति से मिलते हैं। निम्नलिखित उदाहरण में, हम फॉर्च में एक प्रिंट फ़ंक्शन कहते हैं, जो RDD में सभी तत्वों को प्रिंट करता है।

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - foreach (f) के लिए कमांड है -

$SPARK_HOME/bin/spark-submit foreach.py

Output - उपरोक्त कमांड के लिए आउटपुट है -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

फिल्टर (च)

तत्वों के साथ एक नया आरडीडी लौटाया जाता है, जो फ़िल्टर के अंदर फ़ंक्शन को संतुष्ट करता है। निम्नलिखित उदाहरण में, हम 'स्पार्क' वाले तारों को छानते हैं।

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - फ़िल्टर (f) के लिए कमांड है -

$SPARK_HOME/bin/spark-submit filter.py

Output - उपरोक्त कमांड के लिए आउटपुट है -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

नक्शा (च, परिरक्षण

RDD में प्रत्येक तत्व के लिए एक फ़ंक्शन लागू करके एक नया RDD लौटाया जाता है। निम्नलिखित उदाहरण में, हम एक महत्वपूर्ण मान युग्म बनाते हैं और प्रत्येक स्ट्रिंग को 1 के मान के साथ मैप करते हैं।

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - मानचित्र (च, परिरक्षण = गलत) के लिए आदेश है -

$SPARK_HOME/bin/spark-submit map.py

Output - उपरोक्त कमांड का आउटपुट है -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

को कम करने (च)

निर्दिष्ट कम्यूटेटिव और साहचर्य बाइनरी ऑपरेशन करने के बाद, RDD में तत्व वापस आ गया है। निम्नलिखित उदाहरण में, हम ऑपरेटर से पैकेज जोड़ रहे हैं और इसे एक सरल जोड़ ऑपरेशन करने के लिए 'संख्या' पर लागू कर रहे हैं।

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - कमी (f) के लिए कमांड है -

$SPARK_HOME/bin/spark-submit reduce.py

Output - उपरोक्त कमांड का आउटपुट है -

Adding all the elements -> 15

शामिल हों (अन्य, संख्या = कोई नहीं)

यह RDD से मेल खाता कुंजी के साथ तत्वों की एक जोड़ी और उस विशेष कुंजी के लिए सभी मान देता है। निम्नलिखित उदाहरण में, दो अलग-अलग RDD में दो जोड़ी तत्व हैं। इन दो RDDs से जुड़ने के बाद, हमें RDD मिलते हैं जिनमें तत्वों की मिलान कुंजी और उनके मान होते हैं।

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - ज्वाइन (अन्य, अंक = कोई नहीं) के लिए कमांड है -

$SPARK_HOME/bin/spark-submit join.py

Output - उपरोक्त कमांड के लिए आउटपुट है -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

कैश ()

डिफ़ॉल्ट भंडारण स्तर (MEMORY_ONLY) के साथ इस RDD को जारी रखें। आप यह भी देख सकते हैं कि RDD कैश है या नहीं।

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - कैश के लिए कमांड () है -

$SPARK_HOME/bin/spark-submit cache.py

Output - उपरोक्त कार्यक्रम के लिए आउटपुट है -

Words got cached -> True

ये कुछ सबसे महत्वपूर्ण ऑपरेशन थे जो PySpark RDD पर किए गए थे।

समानांतर प्रसंस्करण के लिए, अपाचे स्पार्क साझा चर का उपयोग करता है। साझा चर की एक प्रति क्लस्टर के प्रत्येक नोड पर जाती है जब चालक कार्य को क्लस्टर पर निष्पादक को भेजता है, ताकि इसका उपयोग कार्य करने के लिए किया जा सके।

Apache Spark द्वारा समर्थित दो प्रकार के साझा चर हैं -

  • Broadcast
  • Accumulator

आइए हम उन्हें विस्तार से समझते हैं।

प्रसारण

ब्रॉडकास्ट वैरिएबल का उपयोग सभी नोड्स में डेटा की कॉपी को बचाने के लिए किया जाता है। यह चर सभी मशीनों पर कैश किया गया है और कार्यों के साथ मशीनों पर नहीं भेजा गया है। निम्नलिखित कोड ब्लॉक में PySpark के लिए एक प्रसारण वर्ग का विवरण है।

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

निम्न उदाहरण दिखाता है कि प्रसारण चर का उपयोग कैसे करें। ब्रॉडकास्ट वैरिएबल में एक विशेषता होती है जिसे वैल्यू कहा जाता है, जो डेटा को स्टोर करता है और ब्रॉडकास्ट किए गए मान को वापस करने के लिए उपयोग किया जाता है।

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - एक प्रसारण चर के लिए कमान इस प्रकार है -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - निम्न कमांड के लिए आउटपुट नीचे दिया गया है।

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

बिजली संचयक यंत्र

Accumulator चर का उपयोग सूचनाओं को संबद्ध और कम्यूटेटिव ऑपरेशन के माध्यम से एकत्र करने के लिए किया जाता है। उदाहरण के लिए, आप एक राशि के लिए एक संचयक का उपयोग कर सकते हैं ऑपरेशन या काउंटर (MapReduce में)। निम्नलिखित कोड ब्लॉक में PySpark के लिए एक Accumulator वर्ग का विवरण है।

class pyspark.Accumulator(aid, value, accum_param)

निम्न उदाहरण दिखाता है कि Accumulator चर का उपयोग कैसे करें। एक Accumulator चर में एक विशेषता होती है जिसे मूल्य कहा जाता है जो एक प्रसारण चर के समान है। यह डेटा संग्रहीत करता है और संचायक के मान को वापस करने के लिए उपयोग किया जाता है, लेकिन केवल ड्राइवर प्रोग्राम में उपयोग करने योग्य है।

इस उदाहरण में, एक संचायक चर का उपयोग कई श्रमिकों द्वारा किया जाता है और एक संचित मूल्य देता है।

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - संचायक चर के लिए कमांड निम्नानुसार है -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - उपरोक्त कमांड के लिए आउटपुट नीचे दिया गया है।

Accumulated value is -> 150

स्थानीय / क्लस्टर पर स्पार्क एप्लिकेशन को चलाने के लिए, आपको कुछ कॉन्फ़िगरेशन और पैरामीटर सेट करने की आवश्यकता होती है, यही स्पार्ककॉन्फ़ मदद करता है। यह एक स्पार्क एप्लिकेशन को चलाने के लिए कॉन्फ़िगरेशन प्रदान करता है। निम्नलिखित कोड ब्लॉक में PySpark के लिए एक SparkConf वर्ग का विवरण है।

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

प्रारंभ में, हम स्पार्ककॉन्फ़ () के साथ एक स्पार्ककॉन्फ़ ऑब्जेक्ट बनाएंगे, जो मूल्यों को लोड करेगा spark.*जावा सिस्टम गुण भी। अब आप स्पार्ककॉन्फ ऑब्जेक्ट का उपयोग करके विभिन्न पैरामीटर सेट कर सकते हैं और उनके पैरामीटर सिस्टम गुणों पर प्राथमिकता लेंगे।

स्पार्ककॉन्फ़ क्लास में, सेटर विधियाँ होती हैं, जो चैनिंग का समर्थन करती हैं। उदाहरण के लिए, आप लिख सकते हैंconf.setAppName(“PySpark App”).setMaster(“local”)। एक बार जब हम Apache Spark के लिए एक SparkConf ऑब्जेक्ट पास करते हैं, तो इसे किसी भी उपयोगकर्ता द्वारा संशोधित नहीं किया जा सकता है।

SparkConf की सबसे अधिक इस्तेमाल की जाने वाली विशेषताओं में से कुछ निम्नलिखित हैं -

  • set(key, value) - एक विन्यास संपत्ति सेट करने के लिए।

  • setMaster(value) - मास्टर URL सेट करने के लिए।

  • setAppName(value) - एक आवेदन नाम सेट करने के लिए।

  • get(key, defaultValue=None) - एक कुंजी का कॉन्फ़िगरेशन मान प्राप्त करने के लिए।

  • setSparkHome(value) - वर्कर नोड्स पर स्पार्क इंस्टॉलेशन पाथ सेट करना।

आइए पायस्पार्क कार्यक्रम में स्पार्ककोनफ का उपयोग करने के निम्नलिखित उदाहरण पर विचार करें। इस उदाहरण में, हम स्पार्क एप्लिकेशन नाम के रूप में सेट कर रहे हैंPySpark App और → करने के लिए एक स्पार्क आवेदन के लिए मास्टर यूआरएल की स्थापना spark://master:7077

निम्न कोड ब्लॉक में लाइनें हैं, जब वे पायथन फ़ाइल में जुड़ जाते हैं, तो यह PySpon एप्लिकेशन को चलाने के लिए मूल कॉन्फ़िगरेशन सेट करता है।

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

अपाचे स्पार्क में, आप अपनी फ़ाइलों का उपयोग करके अपलोड कर सकते हैं sc.addFile (sc आपकी डिफ़ॉल्ट स्पार्क कॉन्टेक्स्ट है) और प्रयोग करने वाले किसी कार्यकर्ता पर पथ प्राप्त करें SparkFiles.get। इस प्रकार, स्पार्कफाइल्स के माध्यम से जोड़ी गई फ़ाइलों के लिए पथ को हल करते हैंSparkContext.addFile()

स्पार्कफाइल्स में निम्न वर्गमिथोड होते हैं -

  • get(filename)
  • getrootdirectory()

आइए हम उन्हें विस्तार से समझते हैं।

मिल (फ़ाइल का नाम)

यह SparkContext.addFile () के माध्यम से जोड़ी गई फ़ाइल का पथ निर्दिष्ट करता है।

getrootdirectory ()

यह रूट निर्देशिका के लिए पथ निर्दिष्ट करता है, जिसमें SparkContext.addFile () के माध्यम से जोड़ा गया फ़ाइल शामिल है।

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command - कमांड इस प्रकार है -

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output - उपरोक्त कमांड के लिए आउटपुट है -

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

StorageLevel तय करता है कि RDD को कैसे संग्रहीत किया जाना चाहिए। अपाचे स्पार्क में, StorageLevel यह तय करता है कि RDD को मेमोरी में संग्रहीत किया जाना चाहिए या इसे डिस्क पर संग्रहीत किया जाना चाहिए, या दोनों। यह भी तय करता है कि क्या आरडीडी को क्रमबद्ध करना है और क्या आरडीडी विभाजन को दोहराया जाना है।

निम्नलिखित कोड ब्लॉक में StorageLevel की वर्ग परिभाषा है -

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

अब, RDD के भंडारण का निर्णय लेने के लिए, विभिन्न भंडारण स्तर हैं, जो नीचे दिए गए हैं -

  • DISK_ONLY = स्टोरेजवेल (सच, गलत, गलत, गलत, 1)

  • DISK_ONLY_2 = स्टोरेजवेल (सच, गलत, गलत, गलत, 2)

  • MEMORY_AND_DISK = स्टोरेजवेल (सच, सच, गलत, गलत, 1)

  • MEMORY_AND_DISK_2 = स्टोरेजवेल (सच, सच, गलत, गलत, 2)

  • MEMORY_AND_DISK_SER = स्टोरेजवेल (सच, सच, गलत, गलत, 1)

  • MEMORY_AND_DISK_SER_2 = स्टोरेजवेल (सच, सच, गलत, गलत, 2)

  • MEMORY_ONLY = स्टोरेजवेल (झूठी, सच्ची, झूठी, झूठी, १)

  • MEMORY_ONLY_2 = स्टोरेजवेल (गलत, सच, गलत, गलत, 2)

  • MEMORY_ONLY_SER = स्टोरेजवेल (झूठी, सच्ची, झूठी, झूठी, १)

  • MEMORY_ONLY_SER_2 = स्टोरेजवेल (गलत, सच, गलत, गलत, 2)

  • OFF_HEAP = स्टोरेजवेल (सच, सच्चा, सच्चा, गलत, 1)

हमें StorageLevel के निम्नलिखित उदाहरण पर विचार करें, जहां हम भंडारण स्तर का उपयोग करते हैं MEMORY_AND_DISK_2, जिसका मतलब है कि RDD विभाजन में 2 की प्रतिकृति होगी।

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command - कमांड इस प्रकार है -

$SPARK_HOME/bin/spark-submit storagelevel.py

Output - उपरोक्त कमांड के लिए आउटपुट नीचे दिया गया है -

Disk Memory Serialized 2x Replicated

Apache Spark नामक एक मशीन लर्निंग एपीआई प्रदान करता है MLlib। पाइस्पार्क में पाइथन के रूप में यह मशीन लर्निंग एपीआई है। यह विभिन्न प्रकार के एल्गोरिदम का समर्थन करता है, जो नीचे उल्लिखित हैं -

  • mllib.classification - spark.mllibपैकेज द्विआधारी वर्गीकरण, मल्टीस्केल्स वर्गीकरण और प्रतिगमन विश्लेषण के लिए विभिन्न तरीकों का समर्थन करता है। वर्गीकरण में सबसे लोकप्रिय एल्गोरिदम में से कुछ हैंRandom Forest, Naive Bayes, Decision Tree, आदि।

  • mllib.clustering - क्लस्टरिंग एक अनसुलझी सीखने की समस्या है, जिसके तहत आप समानता की कुछ धारणा के आधार पर एक दूसरे के साथ संस्थाओं के सबसेट समूह का लक्ष्य रखते हैं।

  • mllib.fpm- बार-बार पैटर्न का मिलान लगातार आइटम, आइटम, परवर्ती या अन्य सबस्ट्रक्चर को माइन कर रहा है जो आमतौर पर बड़े पैमाने पर डेटासेट का विश्लेषण करने वाले पहले चरणों में से हैं। यह वर्षों से डेटा माइनिंग में एक सक्रिय शोध विषय रहा है।

  • mllib.linalg - रैखिक बीजगणित के लिए एमएललिब उपयोगिताओं।

  • mllib.recommendation- कोलैबोरेटिव फ़िल्टरिंग का उपयोग आमतौर पर सिफारिशकर्ता सिस्टम के लिए किया जाता है। इन तकनीकों का उद्देश्य उपयोगकर्ता आइटम एसोसिएशन मैट्रिक्स की अनुपलब्ध प्रविष्टियों को भरना है।

  • spark.mllib- यह आदर्श रूप से मॉडल-आधारित सहयोगी फ़िल्टरिंग का समर्थन करता है, जिसमें उपयोगकर्ताओं और उत्पादों को अव्यक्त कारकों के एक छोटे सेट द्वारा वर्णित किया जाता है, जिनका उपयोग लापता प्रविष्टियों की भविष्यवाणी करने के लिए किया जा सकता है। स्पार्क.म्लिब इन अव्यक्त कारकों को सीखने के लिए अल्टरनेटिंग लिस्ट स्क्वायर (एएलएस) एल्गोरिथ्म का उपयोग करता है।

  • mllib.regression- रेखीय प्रतिगमन प्रतिगमन एल्गोरिदम के परिवार से संबंधित है। प्रतिगमन का लक्ष्य चरों के बीच संबंधों और निर्भरता का पता लगाना है। रैखिक प्रतिगमन मॉडल और मॉडल सारांश के साथ काम करने के लिए इंटरफ़ेस लॉजिस्टिक प्रतिगमन मामले के समान है।

अन्य एल्गोरिदम, कक्षाएं और कार्य भी mllib पैकेज के एक भाग के रूप में हैं। अब तक, हम एक प्रदर्शन को समझते हैंpyspark.mllib

निम्न उदाहरण ALS एल्गोरिथ्म का उपयोग करते हुए सिफारिशी मॉडल बनाने और प्रशिक्षण डेटा पर मूल्यांकन करने के लिए सहयोगी फ़िल्टरिंग का है।

Dataset used - test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command - कमांड इस प्रकार होगी -

$SPARK_HOME/bin/spark-submit recommend.py

Output - उपरोक्त कमांड का आउटपुट होगा -

Mean Squared Error = 1.20536041839e-05

Serialization का उपयोग Apache Spark पर प्रदर्शन ट्यूनिंग के लिए किया जाता है। सभी डेटा जो नेटवर्क पर भेजे जाते हैं या डिस्क पर लिखे जाते हैं या मेमोरी में बने रहते हैं, उन्हें क्रमबद्ध किया जाना चाहिए। महंगा परिचालन में सीरियलाइजेशन एक महत्वपूर्ण भूमिका निभाता है।

PySpark प्रदर्शन ट्यूनिंग के लिए कस्टम धारावाहिकों का समर्थन करता है। निम्नलिखित दो धारावाहिकों को PySpark द्वारा समर्थित किया जाता है -

MarshalSerializer

पायथन के मार्शल सीरियलाइज़र का उपयोग करके वस्तुओं को सीरियल करता है। यह धारावाहिक अचार से बेहतर है, लेकिन यह कम डेटाैटिप्स का समर्थन करता है।

class pyspark.MarshalSerializer

PickleSerializer

पायथन के अचार सीरीज़ाइज़र का उपयोग करके वस्तुओं को सीरियल करता है। यह धारावाहिक लगभग किसी भी पायथन ऑब्जेक्ट का समर्थन करता है, लेकिन अधिक विशिष्ट धारावाहिकों के रूप में तेज़ नहीं हो सकता है।

class pyspark.PickleSerializer

आइये PySpark क्रमांकन पर एक उदाहरण देखते हैं। यहां, हम मार्शलशरीरलाइज़र का उपयोग करके डेटा को क्रमबद्ध करते हैं।

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command - कमांड इस प्रकार है -

$SPARK_HOME/bin/spark-submit serializing.py

Output - उपरोक्त कमांड का आउटपुट है -

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]