PySpark-RDD
이제 시스템에 PySpark를 설치하고 구성 했으므로 Apache Spark에서 Python으로 프로그래밍 할 수 있습니다. 그러나 그렇게하기 전에 Spark-RDD의 기본 개념을 이해하겠습니다.
RDD는 Resilient Distributed Dataset, 이들은 클러스터에서 병렬 처리를 수행하기 위해 여러 노드에서 실행 및 작동하는 요소입니다. RDD는 변경 불가능한 요소이므로 RDD를 생성 한 후에는 변경할 수 없습니다. RDD도 내결함성이 있으므로 오류가 발생하면 자동으로 복구됩니다. 이러한 RDD에 여러 작업을 적용하여 특정 작업을 수행 할 수 있습니다.
이 RDD에 작업을 적용하려면 두 가지 방법이 있습니다.
- 변환 및
- Action
이 두 가지 방법을 자세히 이해합시다.
Transformation− 새로운 RDD를 생성하기 위해 RDD에 적용되는 작업입니다. Filter, groupBy 및 map은 변환의 예입니다.
Action − 이는 RDD에 적용되는 작업으로, Spark에 계산을 수행하고 결과를 드라이버로 다시 보내도록 지시합니다.
PySpark에서 작업을 적용하려면 PySpark RDD먼저. 다음 코드 블록은 PySpark RDD 클래스의 세부 사항을 가지고 있습니다-
class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
PySpark를 사용하여 몇 가지 기본 작업을 실행하는 방법을 살펴 보겠습니다. Python 파일의 다음 코드는 언급 된 단어 집합을 저장하는 RDD 단어를 만듭니다.
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
이제 단어에 대한 몇 가지 작업을 실행합니다.
카운트()
RDD의 요소 수가 반환됩니다.
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
Command − count () 명령은 −
$SPARK_HOME/bin/spark-submit count.py
Output − 위 명령의 출력은 −
Number of elements in RDD → 8
수집()
RDD의 모든 요소가 반환됩니다.
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
Command − collect () 명령은 −
$SPARK_HOME/bin/spark-submit collect.py
Output − 위 명령의 출력은 −
Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
foreach (f)
foreach 내부 함수의 조건을 충족하는 요소 만 반환합니다. 다음 예에서는 RDD의 모든 요소를 인쇄하는 foreach의 인쇄 함수를 호출합니다.
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
Command − foreach (f)에 대한 명령은 −
$SPARK_HOME/bin/spark-submit foreach.py
Output − 위 명령의 출력은 −
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
필터 (f)
필터 내부의 기능을 충족하는 요소를 포함하는 새 RDD가 반환됩니다. 다음 예에서는 ''spark "가 포함 된 문자열을 필터링합니다.
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
Command − filter (f) 명령은 −
$SPARK_HOME/bin/spark-submit filter.py
Output − 위 명령의 출력은 −
Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
map (f, preservesPartitioning = False)
RDD의 각 요소에 함수를 적용하면 새 RDD가 반환됩니다. 다음 예에서는 키 값 쌍을 형성하고 모든 문자열을 값 1로 매핑합니다.
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
Command − map (f, preservesPartitioning = False) 명령은 −
$SPARK_HOME/bin/spark-submit map.py
Output − 위 명령의 출력은 −
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
감소 (f)
지정된 교환 및 연관 이진 연산을 수행 한 후 RDD의 요소가 반환됩니다. 다음 예제에서는 연산자에서 패키지 추가를 가져 와서 'num'에 적용하여 간단한 추가 작업을 수행합니다.
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
Command − reduce (f) 명령은 −
$SPARK_HOME/bin/spark-submit reduce.py
Output − 위 명령의 출력은 −
Adding all the elements -> 15
join (기타, numPartitions = 없음)
일치하는 키가있는 요소 쌍과 해당 특정 키에 대한 모든 값이있는 RDD를 반환합니다. 다음 예에서는 두 개의 서로 다른 RDD에 두 쌍의 요소가 있습니다. 이 두 RDD를 결합한 후 일치하는 키와 해당 값을 가진 요소가있는 RDD를 얻습니다.
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
Command − join (other, numPartitions = None) 명령은 −
$SPARK_HOME/bin/spark-submit join.py
Output − 위 명령의 출력은 −
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
은닉처()
기본 저장소 수준 (MEMORY_ONLY)으로이 RDD를 유지합니다. RDD가 캐시되었는지 여부도 확인할 수 있습니다.
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
Command − cache () 명령은 −
$SPARK_HOME/bin/spark-submit cache.py
Output − 위 프로그램의 출력은 −
Words got cached -> True
이들은 PySpark RDD에서 수행되는 가장 중요한 작업 중 일부입니다.