PySpark - Yayın ve Akümülatör
Paralel işleme için Apache Spark, paylaşılan değişkenler kullanır. Paylaşılan değişkenin bir kopyası, sürücü küme üzerindeki yürütücüye bir görev gönderdiğinde kümenin her bir düğümüne gider, böylece görevler gerçekleştirmek için kullanılabilir.
Apache Spark tarafından desteklenen iki tür paylaşılan değişken vardır -
- Broadcast
- Accumulator
Onları detaylı olarak anlayalım.
Yayın yapmak
Yayın değişkenleri, verilerin kopyasını tüm düğümlerde kaydetmek için kullanılır. Bu değişken tüm makinelerde önbelleğe alınır ve görevleri olan makinelere gönderilmez. Aşağıdaki kod bloğu, PySpark için bir Yayın sınıfının ayrıntılarına sahiptir.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
Aşağıdaki örnek, bir Broadcast değişkeninin nasıl kullanılacağını gösterir. Yayın değişkeni, verileri depolayan ve yayınlanan bir değeri döndürmek için kullanılan değer adlı bir niteliğe sahiptir.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - Bir yayın değişkeninin komutu aşağıdaki gibidir -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - Aşağıdaki komutun çıktısı aşağıda verilmiştir.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Akümülatör
Biriktirici değişkenler, ilişkisel ve değişmeli işlemler aracılığıyla bilgileri toplamak için kullanılır. Örneğin, toplama işlemi veya sayaçlar için bir toplayıcı kullanabilirsiniz (MapReduce'da). Aşağıdaki kod bloğu, PySpark için bir Akümülatör sınıfının ayrıntılarına sahiptir.
class pyspark.Accumulator(aid, value, accum_param)
Aşağıdaki örnek, bir Akümülatör değişkeninin nasıl kullanılacağını gösterir. Bir Akümülatör değişkeni, bir yayın değişkeninin sahip olduğu şeye benzer değer adı verilen bir niteliğe sahiptir. Verileri depolar ve akümülatörün değerini döndürmek için kullanılır, ancak yalnızca bir sürücü programında kullanılabilir.
Bu örnekte, bir biriktirici değişkeni birden çok çalışan tarafından kullanılır ve birikmiş bir değer döndürür.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - Bir toplayıcı değişken için komut aşağıdaki gibidir -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - Yukarıdaki komutun çıktısı aşağıda verilmiştir.
Accumulated value is -> 150