PySpark - трансляция и накопитель

Для параллельной обработки Apache Spark использует общие переменные. Копия общей переменной отправляется на каждый узел кластера, когда драйвер отправляет задачу исполнителю в кластере, чтобы ее можно было использовать для выполнения задач.

Apache Spark поддерживает два типа общих переменных:

  • Broadcast
  • Accumulator

Давайте разберемся с ними подробнее.

Трансляция

Переменные широковещательной передачи используются для сохранения копии данных на всех узлах. Эта переменная кэшируется на всех машинах и не отправляется на машины с задачами. В следующем блоке кода содержится подробная информация о классе Broadcast для PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

В следующем примере показано, как использовать переменную Broadcast. Переменная Broadcast имеет атрибут с именем value, который хранит данные и используется для возврата транслируемого значения.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - Команда для широковещательной переменной выглядит следующим образом -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - Ниже приведены выходные данные для следующей команды.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Аккумулятор

Накопительные переменные используются для агрегирования информации посредством ассоциативных и коммутативных операций. Например, вы можете использовать аккумулятор для операции суммирования или счетчиков (в MapReduce). В следующем блоке кода содержится подробная информация о классе Accumulator для PySpark.

class pyspark.Accumulator(aid, value, accum_param)

В следующем примере показано, как использовать переменную Accumulator. У переменной Accumulator есть атрибут, называемый значением, который аналогичен тому, что имеет широковещательная переменная. Он хранит данные и используется для возврата значения аккумулятора, но может использоваться только в программе драйвера.

В этом примере аккумуляторная переменная используется несколькими рабочими процессами и возвращает накопленное значение.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - Команда для переменной аккумулятора выглядит следующим образом -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - Результат выполнения вышеуказанной команды приведен ниже.

Accumulated value is -> 150