PySpark-ブロードキャストおよびアキュムレータ

並列処理の場合、ApacheSparkは共有変数を使用します。共有変数のコピーは、ドライバーがクラスター上のエグゼキューターにタスクを送信するときにクラスターの各ノードに送られるため、タスクの実行に使用できます。

ApacheSparkでサポートされている共有変数には2つのタイプがあります-

  • Broadcast
  • Accumulator

それらを詳しく理解しましょう。

放送

ブロードキャスト変数は、すべてのノード間でデータのコピーを保存するために使用されます。この変数はすべてのマシンにキャッシュされ、タスクのあるマシンには送信されません。次のコードブロックには、PySparkのBroadcastクラスの詳細が含まれています。

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

次の例は、Broadcast変数の使用方法を示しています。Broadcast変数には、valueという属性があります。この属性は、データを格納し、ブロードキャストされた値を返すために使用されます。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command −ブロードキャスト変数のコマンドは次のとおりです−

$SPARK_HOME/bin/spark-submit broadcast.py

Output −次のコマンドの出力を以下に示します。

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

アキュムレータ

アキュムレータ変数は、結合法則と可換法則を介して情報を集約するために使用されます。たとえば、(MapReduce内の)合計演算またはカウンターにアキュムレーターを使用できます。次のコードブロックには、PySparkのアキュムレータクラスの詳細が含まれています。

class pyspark.Accumulator(aid, value, accum_param)

次の例は、アキュムレータ変数の使用方法を示しています。アキュムレータ変数には、ブロードキャスト変数と同様の値と呼ばれる属性があります。データを格納し、アキュムレータの値を返すために使用されますが、ドライバプログラムでのみ使用できます。

この例では、アキュムレータ変数が複数のワーカーによって使用され、累積値を返します。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command −アキュムレータ変数のコマンドは次のとおりです。

$SPARK_HOME/bin/spark-submit accumulator.py

Output −上記のコマンドの出力を以下に示します。

Accumulated value is -> 150