PySpark - Broadcast & Accumulator
W przypadku przetwarzania równoległego Apache Spark używa wspólnych zmiennych. Kopia współużytkowanej zmiennej trafia do każdego węzła klastra, gdy sterownik wysyła zadanie do modułu wykonawczego w klastrze, aby można go było użyć do wykonania zadań.
Istnieją dwa typy współdzielonych zmiennych obsługiwanych przez Apache Spark -
- Broadcast
- Accumulator
Rozumiemy je szczegółowo.
Nadawanie
Zmienne rozgłaszania służą do zapisywania kopii danych we wszystkich węzłach. Ta zmienna jest buforowana na wszystkich maszynach i nie jest wysyłana na maszynach z zadaniami. Poniższy blok kodu zawiera szczegóły klasy Broadcast dla PySpark.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
Poniższy przykład pokazuje, jak używać zmiennej Broadcast. Zmienna Broadcast ma atrybut o nazwie value, który przechowuje dane i służy do zwracania wartości rozgłaszanej.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - Polecenie dla zmiennej rozgłoszeniowej jest następujące -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - Dane wyjściowe dla następującego polecenia podano poniżej.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Akumulator
Zmienne akumulatorowe służą do agregowania informacji poprzez operacje asocjacyjne i przemienne. Na przykład można użyć akumulatora dla operacji sumarycznej lub liczników (w MapReduce). Poniższy blok kodu zawiera szczegóły klasy akumulatora dla PySpark.
class pyspark.Accumulator(aid, value, accum_param)
Poniższy przykład pokazuje, jak używać zmiennej Accumulator. Zmienna typu Accumulator ma atrybut zwany wartością, który jest podobny do tego, co ma zmienna rozgłoszeniowa. Przechowuje dane i służy do zwracania wartości akumulatora, ale można go używać tylko w programie sterownika.
W tym przykładzie zmienna akumulacyjna jest używana przez wielu pracowników i zwraca skumulowaną wartość.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - Polecenie dla zmiennej akumulatora jest następujące -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - Dane wyjściowe dla powyższego polecenia podano poniżej.
Accumulated value is -> 150