PySpark-방송 및 누산기

병렬 처리를 위해 Apache Spark는 공유 변수를 사용합니다. 공유 변수의 복사본은 드라이버가 클러스터의 실행자에게 작업을 보낼 때 클러스터의 각 노드로 이동하므로 작업을 수행하는 데 사용할 수 있습니다.

Apache Spark에서 지원하는 두 가지 유형의 공유 변수가 있습니다.

  • Broadcast
  • Accumulator

자세히 이해합시다.

방송

브로드 캐스트 변수는 모든 노드에서 데이터 사본을 저장하는 데 사용됩니다. 이 변수는 모든 컴퓨터에 캐시되며 작업이있는 컴퓨터에는 전송되지 않습니다. 다음 코드 블록에는 PySpark에 대한 Broadcast 클래스의 세부 정보가 있습니다.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

다음 예제는 Broadcast 변수를 사용하는 방법을 보여줍니다. Broadcast 변수에는 데이터를 저장하고 브로드 캐스트 된 값을 반환하는 데 사용되는 value라는 속성이 있습니다.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command − 브로드 캐스트 변수에 대한 명령은 다음과 같습니다. −

$SPARK_HOME/bin/spark-submit broadcast.py

Output − 다음 명령의 출력은 다음과 같습니다.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

어큐뮬레이터

누산기 변수는 연관 및 교환 연산을 통해 정보를 집계하는 데 사용됩니다. 예를 들어 합계 연산 또는 카운터 (MapReduce에서)에 누산기를 사용할 수 있습니다. 다음 코드 블록에는 PySpark 용 Accumulator 클래스의 세부 정보가 있습니다.

class pyspark.Accumulator(aid, value, accum_param)

다음 예제는 Accumulator 변수를 사용하는 방법을 보여줍니다. Accumulator 변수에는 브로드 캐스트 변수와 유사한 value라는 속성이 있습니다. 데이터를 저장하고 누산기 값을 반환하는 데 사용되지만 드라이버 프로그램에서만 사용할 수 있습니다.

이 예제에서 accumulator 변수는 여러 작업자가 사용하고 누적 된 값을 반환합니다.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command − 누산기 변수에 대한 명령은 다음과 같습니다. −

$SPARK_HOME/bin/spark-submit accumulator.py

Output − 위 명령의 출력은 다음과 같습니다.

Accumulated value is -> 150