PySpark - трансляция и накопитель
Для параллельной обработки Apache Spark использует общие переменные. Копия общей переменной отправляется на каждый узел кластера, когда драйвер отправляет задачу исполнителю в кластере, чтобы ее можно было использовать для выполнения задач.
Apache Spark поддерживает два типа общих переменных:
- Broadcast
- Accumulator
Давайте разберемся с ними подробнее.
Трансляция
Переменные широковещательной передачи используются для сохранения копии данных на всех узлах. Эта переменная кэшируется на всех машинах и не отправляется на машины с задачами. В следующем блоке кода содержится подробная информация о классе Broadcast для PySpark.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
В следующем примере показано, как использовать переменную Broadcast. Переменная Broadcast имеет атрибут с именем value, который хранит данные и используется для возврата транслируемого значения.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - Команда для широковещательной переменной выглядит следующим образом -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - Ниже приведены выходные данные для следующей команды.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Аккумулятор
Накопительные переменные используются для агрегирования информации посредством ассоциативных и коммутативных операций. Например, вы можете использовать аккумулятор для операции суммирования или счетчиков (в MapReduce). В следующем блоке кода содержится подробная информация о классе Accumulator для PySpark.
class pyspark.Accumulator(aid, value, accum_param)
В следующем примере показано, как использовать переменную Accumulator. У переменной Accumulator есть атрибут, называемый значением, который аналогичен тому, что имеет широковещательная переменная. Он хранит данные и используется для возврата значения аккумулятора, но может использоваться только в программе драйвера.
В этом примере аккумуляторная переменная используется несколькими рабочими процессами и возвращает накопленное значение.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - Команда для переменной аккумулятора выглядит следующим образом -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - Результат выполнения вышеуказанной команды приведен ниже.
Accumulated value is -> 150