PySpark - Siaran & Akumulator
Untuk pemrosesan paralel, Apache Spark menggunakan variabel bersama. Salinan variabel bersama masuk ke setiap node cluster saat driver mengirim tugas ke pelaksana di cluster, sehingga dapat digunakan untuk melakukan tugas.
Ada dua jenis variabel bersama yang didukung oleh Apache Spark -
- Broadcast
- Accumulator
Mari kita pahami secara detail.
Siaran
Variabel siaran digunakan untuk menyimpan salinan data di semua node. Variabel ini di-cache di semua mesin dan tidak dikirim ke mesin dengan tugas. Blok kode berikut memiliki detail kelas Broadcast untuk PySpark.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
Contoh berikut menunjukkan cara menggunakan variabel Broadcast. Variabel Broadcast memiliki atribut yang disebut value, yang menyimpan data dan digunakan untuk mengembalikan nilai yang disiarkan.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - Perintah untuk variabel broadcast adalah sebagai berikut -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - Output untuk perintah berikut diberikan di bawah ini.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Aki
Variabel akumulator digunakan untuk menggabungkan informasi melalui operasi asosiatif dan komutatif. Misalnya, Anda bisa menggunakan akumulator untuk operasi penjumlahan atau penghitung (di MapReduce). Blok kode berikut memiliki detail kelas Accumulator untuk PySpark.
class pyspark.Accumulator(aid, value, accum_param)
Contoh berikut menunjukkan cara menggunakan variabel Accumulator. Variabel akumulator memiliki atribut yang disebut nilai yang mirip dengan variabel siaran. Ini menyimpan data dan digunakan untuk mengembalikan nilai akumulator, tetapi hanya dapat digunakan dalam program driver.
Dalam contoh ini, variabel akumulator digunakan oleh beberapa pekerja dan menampilkan nilai terakumulasi.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - Perintah untuk variabel akumulator adalah sebagai berikut -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - Output untuk perintah di atas diberikan di bawah ini.
Accumulated value is -> 150