PySpark - प्रसारण और संचायक
समानांतर प्रसंस्करण के लिए, अपाचे स्पार्क साझा चर का उपयोग करता है। साझा चर की एक प्रति क्लस्टर के प्रत्येक नोड पर जाती है जब चालक कार्य को क्लस्टर पर निष्पादक को भेजता है, ताकि इसका उपयोग कार्य करने के लिए किया जा सके।
Apache Spark द्वारा समर्थित दो प्रकार के साझा चर हैं -
- Broadcast
- Accumulator
आइए हम उन्हें विस्तार से समझते हैं।
प्रसारण
ब्रॉडकास्ट वैरिएबल का उपयोग सभी नोड्स में डेटा की कॉपी को बचाने के लिए किया जाता है। यह चर सभी मशीनों पर कैश किया गया है और कार्यों के साथ मशीनों पर नहीं भेजा गया है। निम्नलिखित कोड ब्लॉक में PySpark के लिए एक प्रसारण वर्ग का विवरण है।
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
निम्न उदाहरण दिखाता है कि प्रसारण चर का उपयोग कैसे करें। ब्रॉडकास्ट वैरिएबल में एक विशेषता होती है जिसे वैल्यू कहा जाता है, जो डेटा को स्टोर करता है और ब्रॉडकास्ट किए गए मान को वापस करने के लिए उपयोग किया जाता है।
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - एक प्रसारण चर के लिए कमान इस प्रकार है -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - निम्न कमांड के लिए आउटपुट नीचे दिया गया है।
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
बिजली संचयक यंत्र
Accumulator चर का उपयोग सूचनाओं को संबद्ध और कम्यूटेटिव ऑपरेशन के माध्यम से एकत्र करने के लिए किया जाता है। उदाहरण के लिए, आप एक राशि के लिए एक संचयक का उपयोग कर सकते हैं ऑपरेशन या काउंटर (MapReduce में)। निम्नलिखित कोड ब्लॉक में PySpark के लिए एक Accumulator वर्ग का विवरण है।
class pyspark.Accumulator(aid, value, accum_param)
निम्न उदाहरण दिखाता है कि Accumulator चर का उपयोग कैसे करें। एक Accumulator चर में एक विशेषता होती है जिसे मूल्य कहा जाता है जो एक प्रसारण चर के समान है। यह डेटा संग्रहीत करता है और संचायक के मान को वापस करने के लिए उपयोग किया जाता है, लेकिन केवल ड्राइवर प्रोग्राम में उपयोग करने योग्य है।
इस उदाहरण में, एक संचायक चर का उपयोग कई श्रमिकों द्वारा किया जाता है और एक संचित मूल्य देता है।
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - संचायक चर के लिए कमांड निम्नानुसार है -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - उपरोक्त कमांड के लिए आउटपुट नीचे दिया गया है।
Accumulated value is -> 150