PySpark - Siaran & Akumulator

Untuk pemrosesan paralel, Apache Spark menggunakan variabel bersama. Salinan variabel bersama masuk ke setiap node cluster saat driver mengirim tugas ke pelaksana di cluster, sehingga dapat digunakan untuk melakukan tugas.

Ada dua jenis variabel bersama yang didukung oleh Apache Spark -

  • Broadcast
  • Accumulator

Mari kita pahami secara detail.

Siaran

Variabel siaran digunakan untuk menyimpan salinan data di semua node. Variabel ini di-cache di semua mesin dan tidak dikirim ke mesin dengan tugas. Blok kode berikut memiliki detail kelas Broadcast untuk PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

Contoh berikut menunjukkan cara menggunakan variabel Broadcast. Variabel Broadcast memiliki atribut yang disebut value, yang menyimpan data dan digunakan untuk mengembalikan nilai yang disiarkan.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - Perintah untuk variabel broadcast adalah sebagai berikut -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - Output untuk perintah berikut diberikan di bawah ini.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Aki

Variabel akumulator digunakan untuk menggabungkan informasi melalui operasi asosiatif dan komutatif. Misalnya, Anda bisa menggunakan akumulator untuk operasi penjumlahan atau penghitung (di MapReduce). Blok kode berikut memiliki detail kelas Accumulator untuk PySpark.

class pyspark.Accumulator(aid, value, accum_param)

Contoh berikut menunjukkan cara menggunakan variabel Accumulator. Variabel akumulator memiliki atribut yang disebut nilai yang mirip dengan variabel siaran. Ini menyimpan data dan digunakan untuk mengembalikan nilai akumulator, tetapi hanya dapat digunakan dalam program driver.

Dalam contoh ini, variabel akumulator digunakan oleh beberapa pekerja dan menampilkan nilai terakumulasi.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - Perintah untuk variabel akumulator adalah sebagai berikut -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - Output untuk perintah di atas diberikan di bawah ini.

Accumulated value is -> 150