PySpark-ブロードキャストおよびアキュムレータ
並列処理の場合、ApacheSparkは共有変数を使用します。共有変数のコピーは、ドライバーがクラスター上のエグゼキューターにタスクを送信するときにクラスターの各ノードに送られるため、タスクの実行に使用できます。
ApacheSparkでサポートされている共有変数には2つのタイプがあります-
- Broadcast
- Accumulator
それらを詳しく理解しましょう。
放送
ブロードキャスト変数は、すべてのノード間でデータのコピーを保存するために使用されます。この変数はすべてのマシンにキャッシュされ、タスクのあるマシンには送信されません。次のコードブロックには、PySparkのBroadcastクラスの詳細が含まれています。
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
次の例は、Broadcast変数の使用方法を示しています。Broadcast変数には、valueという属性があります。この属性は、データを格納し、ブロードキャストされた値を返すために使用されます。
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command −ブロードキャスト変数のコマンドは次のとおりです−
$SPARK_HOME/bin/spark-submit broadcast.py
Output −次のコマンドの出力を以下に示します。
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
アキュムレータ
アキュムレータ変数は、結合法則と可換法則を介して情報を集約するために使用されます。たとえば、(MapReduce内の)合計演算またはカウンターにアキュムレーターを使用できます。次のコードブロックには、PySparkのアキュムレータクラスの詳細が含まれています。
class pyspark.Accumulator(aid, value, accum_param)
次の例は、アキュムレータ変数の使用方法を示しています。アキュムレータ変数には、ブロードキャスト変数と同様の値と呼ばれる属性があります。データを格納し、アキュムレータの値を返すために使用されますが、ドライバプログラムでのみ使用できます。
この例では、アキュムレータ変数が複数のワーカーによって使用され、累積値を返します。
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command −アキュムレータ変数のコマンドは次のとおりです。
$SPARK_HOME/bin/spark-submit accumulator.py
Output −上記のコマンドの出力を以下に示します。
Accumulated value is -> 150