PySpark-RDD

이제 시스템에 PySpark를 설치하고 구성 했으므로 Apache Spark에서 Python으로 프로그래밍 할 수 있습니다. 그러나 그렇게하기 전에 Spark-RDD의 기본 개념을 이해하겠습니다.

RDD는 Resilient Distributed Dataset, 이들은 클러스터에서 병렬 처리를 수행하기 위해 여러 노드에서 실행 및 작동하는 요소입니다. RDD는 변경 불가능한 요소이므로 RDD를 생성 한 후에는 변경할 수 없습니다. RDD도 내결함성이 있으므로 오류가 발생하면 자동으로 복구됩니다. 이러한 RDD에 여러 작업을 적용하여 특정 작업을 수행 할 수 있습니다.

이 RDD에 작업을 적용하려면 두 가지 방법이 있습니다.

  • 변환 및
  • Action

이 두 가지 방법을 자세히 이해합시다.

Transformation− 새로운 RDD를 생성하기 위해 RDD에 적용되는 작업입니다. Filter, groupBy 및 map은 변환의 예입니다.

Action − 이는 RDD에 적용되는 작업으로, Spark에 계산을 수행하고 결과를 드라이버로 다시 보내도록 지시합니다.

PySpark에서 작업을 적용하려면 PySpark RDD먼저. 다음 코드 블록은 PySpark RDD 클래스의 세부 사항을 가지고 있습니다-

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

PySpark를 사용하여 몇 가지 기본 작업을 실행하는 방법을 살펴 보겠습니다. Python 파일의 다음 코드는 언급 된 단어 집합을 저장하는 RDD 단어를 만듭니다.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

이제 단어에 대한 몇 가지 작업을 실행합니다.

카운트()

RDD의 요소 수가 반환됩니다.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command − count () 명령은 −

$SPARK_HOME/bin/spark-submit count.py

Output − 위 명령의 출력은 −

Number of elements in RDD → 8

수집()

RDD의 모든 요소가 반환됩니다.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command − collect () 명령은 −

$SPARK_HOME/bin/spark-submit collect.py

Output − 위 명령의 출력은 −

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

foreach 내부 함수의 조건을 충족하는 요소 만 반환합니다. 다음 예에서는 RDD의 모든 요소를 ​​인쇄하는 foreach의 인쇄 함수를 호출합니다.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command − foreach (f)에 대한 명령은 −

$SPARK_HOME/bin/spark-submit foreach.py

Output − 위 명령의 출력은 −

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

필터 (f)

필터 내부의 기능을 충족하는 요소를 포함하는 새 RDD가 반환됩니다. 다음 예에서는 ''spark "가 포함 된 문자열을 필터링합니다.

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command − filter (f) 명령은 −

$SPARK_HOME/bin/spark-submit filter.py

Output − 위 명령의 출력은 −

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

RDD의 각 요소에 함수를 적용하면 새 RDD가 반환됩니다. 다음 예에서는 키 값 쌍을 형성하고 모든 문자열을 값 1로 매핑합니다.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command − map (f, preservesPartitioning = False) 명령은 −

$SPARK_HOME/bin/spark-submit map.py

Output − 위 명령의 출력은 −

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

감소 (f)

지정된 교환 및 연관 이진 연산을 수행 한 후 RDD의 요소가 반환됩니다. 다음 예제에서는 연산자에서 패키지 추가를 가져 와서 'num'에 적용하여 간단한 추가 작업을 수행합니다.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command − reduce (f) 명령은 −

$SPARK_HOME/bin/spark-submit reduce.py

Output − 위 명령의 출력은 −

Adding all the elements -> 15

join (기타, numPartitions = 없음)

일치하는 키가있는 요소 쌍과 해당 특정 키에 대한 모든 값이있는 RDD를 반환합니다. 다음 예에서는 두 개의 서로 다른 RDD에 두 쌍의 요소가 있습니다. 이 두 RDD를 결합한 후 일치하는 키와 해당 값을 가진 요소가있는 RDD를 얻습니다.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command − join (other, numPartitions = None) 명령은 −

$SPARK_HOME/bin/spark-submit join.py

Output − 위 명령의 출력은 −

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

은닉처()

기본 저장소 수준 (MEMORY_ONLY)으로이 RDD를 유지합니다. RDD가 캐시되었는지 여부도 확인할 수 있습니다.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command − cache () 명령은 −

$SPARK_HOME/bin/spark-submit cache.py

Output − 위 프로그램의 출력은 −

Words got cached -> True

이들은 PySpark RDD에서 수행되는 가장 중요한 작업 중 일부입니다.