PySpark - Broadcast & Accumulator
สำหรับการประมวลผลแบบขนาน Apache Spark ใช้ตัวแปรที่ใช้ร่วมกัน สำเนาของตัวแปรที่ใช้ร่วมกันจะไปที่แต่ละโหนดของคลัสเตอร์เมื่อไดรเวอร์ส่งงานไปยังตัวดำเนินการบนคลัสเตอร์เพื่อให้สามารถใช้ในการปฏิบัติงานได้
มีตัวแปรที่ใช้ร่วมกันสองประเภทที่รองรับโดย Apache Spark -
- Broadcast
- Accumulator
ให้เราเข้าใจโดยละเอียด
ออกอากาศ
ตัวแปรบรอดคาสต์ถูกใช้เพื่อบันทึกสำเนาของข้อมูลในทุกโหนด ตัวแปรนี้ถูกแคชบนเครื่องทั้งหมดและไม่ถูกส่งไปยังเครื่องที่มีงาน บล็อกโค้ดต่อไปนี้มีรายละเอียดของคลาส Broadcast สำหรับ PySpark
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
ตัวอย่างต่อไปนี้แสดงวิธีการใช้ตัวแปร Broadcast ตัวแปร Broadcast มีแอตทริบิวต์ที่เรียกว่า value ซึ่งเก็บข้อมูลและใช้เพื่อส่งคืนค่าที่ออกอากาศ
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - คำสั่งสำหรับตัวแปรออกอากาศมีดังนี้ -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - ผลลัพธ์สำหรับคำสั่งต่อไปนี้ได้รับด้านล่าง
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
ตัวสะสม
ตัวแปร Accumulator ใช้สำหรับการรวบรวมข้อมูลผ่านการดำเนินการเชื่อมโยงและการสับเปลี่ยน ตัวอย่างเช่นคุณสามารถใช้ตัวสะสมสำหรับการดำเนินการรวมหรือตัวนับ (ใน MapReduce) บล็อกโค้ดต่อไปนี้มีรายละเอียดของคลาส Accumulator สำหรับ PySpark
class pyspark.Accumulator(aid, value, accum_param)
ตัวอย่างต่อไปนี้แสดงวิธีการใช้ตัวแปร Accumulator ตัวแปร Accumulator มีแอตทริบิวต์ที่เรียกว่าค่าที่คล้ายกับตัวแปรที่ออกอากาศ จัดเก็บข้อมูลและใช้เพื่อส่งคืนค่าของตัวสะสม แต่สามารถใช้ได้เฉพาะในโปรแกรมไดรเวอร์เท่านั้น
ในตัวอย่างนี้ผู้ปฏิบัติงานหลายคนใช้ตัวแปรตัวสะสมและส่งคืนค่าสะสม
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - คำสั่งสำหรับตัวแปรตัวสะสมมีดังนี้ -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวแสดงไว้ด้านล่าง
Accumulated value is -> 150