PySpark - Broadcast & Accumulator
Für die parallele Verarbeitung verwendet Apache Spark gemeinsam genutzte Variablen. Eine Kopie der gemeinsam genutzten Variablen wird auf jedem Knoten des Clusters gespeichert, wenn der Treiber eine Aufgabe an den Executor im Cluster sendet, damit sie zum Ausführen von Aufgaben verwendet werden kann.
Es gibt zwei Arten von gemeinsam genutzten Variablen, die von Apache Spark unterstützt werden:
- Broadcast
- Accumulator
Lassen Sie uns sie im Detail verstehen.
Übertragung
Broadcast-Variablen werden verwendet, um die Kopie der Daten auf allen Knoten zu speichern. Diese Variable wird auf allen Computern zwischengespeichert und nicht auf Computern mit Aufgaben gesendet. Der folgende Codeblock enthält die Details einer Broadcast-Klasse für PySpark.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
Das folgende Beispiel zeigt, wie eine Broadcast-Variable verwendet wird. Eine Broadcast-Variable hat ein Attribut namens value, das die Daten speichert und zur Rückgabe eines Broadcast-Werts verwendet wird.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - Der Befehl für eine Broadcast-Variable lautet wie folgt: -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - Die Ausgabe für den folgenden Befehl ist unten angegeben.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Akkumulator
Akkumulatorvariablen werden zum Aggregieren der Informationen durch assoziative und kommutative Operationen verwendet. Beispielsweise können Sie einen Akkumulator für eine Summenoperation oder Zähler verwenden (in MapReduce). Der folgende Codeblock enthält die Details einer Accumulator-Klasse für PySpark.
class pyspark.Accumulator(aid, value, accum_param)
Das folgende Beispiel zeigt, wie eine Akkumulatorvariable verwendet wird. Eine Akkumulatorvariable hat ein Attribut namens value, das dem einer Broadcastvariablen ähnelt. Es speichert die Daten und wird verwendet, um den Wert des Akkumulators zurückzugeben, kann jedoch nur in einem Treiberprogramm verwendet werden.
In diesem Beispiel wird eine Akkumulatorvariable von mehreren Workern verwendet und gibt einen akkumulierten Wert zurück.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - Der Befehl für eine Akkumulatorvariable lautet wie folgt: -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - Die Ausgabe für den obigen Befehl ist unten angegeben.
Accumulated value is -> 150