PySpark - Broadcast & Accumulator

สำหรับการประมวลผลแบบขนาน Apache Spark ใช้ตัวแปรที่ใช้ร่วมกัน สำเนาของตัวแปรที่ใช้ร่วมกันจะไปที่แต่ละโหนดของคลัสเตอร์เมื่อไดรเวอร์ส่งงานไปยังตัวดำเนินการบนคลัสเตอร์เพื่อให้สามารถใช้ในการปฏิบัติงานได้

มีตัวแปรที่ใช้ร่วมกันสองประเภทที่รองรับโดย Apache Spark -

  • Broadcast
  • Accumulator

ให้เราเข้าใจโดยละเอียด

ออกอากาศ

ตัวแปรบรอดคาสต์ถูกใช้เพื่อบันทึกสำเนาของข้อมูลในทุกโหนด ตัวแปรนี้ถูกแคชบนเครื่องทั้งหมดและไม่ถูกส่งไปยังเครื่องที่มีงาน บล็อกโค้ดต่อไปนี้มีรายละเอียดของคลาส Broadcast สำหรับ PySpark

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

ตัวอย่างต่อไปนี้แสดงวิธีการใช้ตัวแปร Broadcast ตัวแปร Broadcast มีแอตทริบิวต์ที่เรียกว่า value ซึ่งเก็บข้อมูลและใช้เพื่อส่งคืนค่าที่ออกอากาศ

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - คำสั่งสำหรับตัวแปรออกอากาศมีดังนี้ -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - ผลลัพธ์สำหรับคำสั่งต่อไปนี้ได้รับด้านล่าง

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

ตัวสะสม

ตัวแปร Accumulator ใช้สำหรับการรวบรวมข้อมูลผ่านการดำเนินการเชื่อมโยงและการสับเปลี่ยน ตัวอย่างเช่นคุณสามารถใช้ตัวสะสมสำหรับการดำเนินการรวมหรือตัวนับ (ใน MapReduce) บล็อกโค้ดต่อไปนี้มีรายละเอียดของคลาส Accumulator สำหรับ PySpark

class pyspark.Accumulator(aid, value, accum_param)

ตัวอย่างต่อไปนี้แสดงวิธีการใช้ตัวแปร Accumulator ตัวแปร Accumulator มีแอตทริบิวต์ที่เรียกว่าค่าที่คล้ายกับตัวแปรที่ออกอากาศ จัดเก็บข้อมูลและใช้เพื่อส่งคืนค่าของตัวสะสม แต่สามารถใช้ได้เฉพาะในโปรแกรมไดรเวอร์เท่านั้น

ในตัวอย่างนี้ผู้ปฏิบัติงานหลายคนใช้ตัวแปรตัวสะสมและส่งคืนค่าสะสม

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - คำสั่งสำหรับตัวแปรตัวสะสมมีดังนี้ -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวแสดงไว้ด้านล่าง

Accumulated value is -> 150