PySpark - Broadcast & Accumulator

W przypadku przetwarzania równoległego Apache Spark używa wspólnych zmiennych. Kopia współużytkowanej zmiennej trafia do każdego węzła klastra, gdy sterownik wysyła zadanie do modułu wykonawczego w klastrze, aby można go było użyć do wykonania zadań.

Istnieją dwa typy współdzielonych zmiennych obsługiwanych przez Apache Spark -

  • Broadcast
  • Accumulator

Rozumiemy je szczegółowo.

Nadawanie

Zmienne rozgłaszania służą do zapisywania kopii danych we wszystkich węzłach. Ta zmienna jest buforowana na wszystkich maszynach i nie jest wysyłana na maszynach z zadaniami. Poniższy blok kodu zawiera szczegóły klasy Broadcast dla PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

Poniższy przykład pokazuje, jak używać zmiennej Broadcast. Zmienna Broadcast ma atrybut o nazwie value, który przechowuje dane i służy do zwracania wartości rozgłaszanej.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - Polecenie dla zmiennej rozgłoszeniowej jest następujące -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - Dane wyjściowe dla następującego polecenia podano poniżej.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Akumulator

Zmienne akumulatorowe służą do agregowania informacji poprzez operacje asocjacyjne i przemienne. Na przykład można użyć akumulatora dla operacji sumarycznej lub liczników (w MapReduce). Poniższy blok kodu zawiera szczegóły klasy akumulatora dla PySpark.

class pyspark.Accumulator(aid, value, accum_param)

Poniższy przykład pokazuje, jak używać zmiennej Accumulator. Zmienna typu Accumulator ma atrybut zwany wartością, który jest podobny do tego, co ma zmienna rozgłoszeniowa. Przechowuje dane i służy do zwracania wartości akumulatora, ale można go używać tylko w programie sterownika.

W tym przykładzie zmienna akumulacyjna jest używana przez wielu pracowników i zwraca skumulowaną wartość.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - Polecenie dla zmiennej akumulatora jest następujące -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - Dane wyjściowe dla powyższego polecenia podano poniżej.

Accumulated value is -> 150