PySpark - Yayın ve Akümülatör

Paralel işleme için Apache Spark, paylaşılan değişkenler kullanır. Paylaşılan değişkenin bir kopyası, sürücü küme üzerindeki yürütücüye bir görev gönderdiğinde kümenin her bir düğümüne gider, böylece görevler gerçekleştirmek için kullanılabilir.

Apache Spark tarafından desteklenen iki tür paylaşılan değişken vardır -

  • Broadcast
  • Accumulator

Onları detaylı olarak anlayalım.

Yayın yapmak

Yayın değişkenleri, verilerin kopyasını tüm düğümlerde kaydetmek için kullanılır. Bu değişken tüm makinelerde önbelleğe alınır ve görevleri olan makinelere gönderilmez. Aşağıdaki kod bloğu, PySpark için bir Yayın sınıfının ayrıntılarına sahiptir.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

Aşağıdaki örnek, bir Broadcast değişkeninin nasıl kullanılacağını gösterir. Yayın değişkeni, verileri depolayan ve yayınlanan bir değeri döndürmek için kullanılan değer adlı bir niteliğe sahiptir.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - Bir yayın değişkeninin komutu aşağıdaki gibidir -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - Aşağıdaki komutun çıktısı aşağıda verilmiştir.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Akümülatör

Biriktirici değişkenler, ilişkisel ve değişmeli işlemler aracılığıyla bilgileri toplamak için kullanılır. Örneğin, toplama işlemi veya sayaçlar için bir toplayıcı kullanabilirsiniz (MapReduce'da). Aşağıdaki kod bloğu, PySpark için bir Akümülatör sınıfının ayrıntılarına sahiptir.

class pyspark.Accumulator(aid, value, accum_param)

Aşağıdaki örnek, bir Akümülatör değişkeninin nasıl kullanılacağını gösterir. Bir Akümülatör değişkeni, bir yayın değişkeninin sahip olduğu şeye benzer değer adı verilen bir niteliğe sahiptir. Verileri depolar ve akümülatörün değerini döndürmek için kullanılır, ancak yalnızca bir sürücü programında kullanılabilir.

Bu örnekte, bir biriktirici değişkeni birden çok çalışan tarafından kullanılır ve birikmiş bir değer döndürür.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - Bir toplayıcı değişken için komut aşağıdaki gibidir -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - Yukarıdaki komutun çıktısı aşağıda verilmiştir.

Accumulated value is -> 150