Apache Spark-핵심 프로그래밍

Spark Core는 전체 프로젝트의 기반입니다. 분산 작업 디스패치, 스케줄링 및 기본 I / O 기능을 제공합니다. Spark는 시스템간에 분할 된 데이터의 논리적 모음 인 RDD (Resilient Distributed Datasets)라는 특수한 기본 데이터 구조를 사용합니다. RDD는 두 가지 방법으로 만들 수 있습니다. 하나는 외부 스토리지 시스템의 데이터 세트를 참조하는 것이고 두 번째는 기존 RDD에 변환 (예 : 맵, 필터, 리듀서, 조인)을 적용하는 것입니다.

RDD 추상화는 언어 통합 API를 통해 노출됩니다. 이는 애플리케이션이 RDD를 조작하는 방식이 로컬 데이터 콜렉션을 조작하는 것과 유사하기 때문에 프로그래밍 복잡성을 단순화합니다.

스파크 쉘

Spark는 대화식으로 데이터를 분석 할 수있는 강력한 도구 인 대화 형 셸을 제공합니다. Scala 또는 Python 언어로 제공됩니다. Spark의 기본 추상화는 RDD (Resilient Distributed Dataset)라고하는 분산 된 항목 모음입니다. RDD는 Hadoop 입력 형식 (예 : HDFS 파일)에서 만들거나 다른 RDD를 변환하여 만들 수 있습니다.

Spark Shell 열기

다음 명령은 Spark 셸을 여는 데 사용됩니다.

$ spark-shell

간단한 RDD 만들기

텍스트 파일에서 간단한 RDD를 생성 해 보겠습니다. 다음 명령을 사용하여 간단한 RDD를 만듭니다.

scala> val inputfile = sc.textFile(“input.txt”)

위 명령의 출력은 다음과 같습니다.

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API는 Transformations 그리고 거의 Actions RDD를 조작합니다.

RDD 변환

RDD 변환은 새 RDD에 대한 포인터를 반환하고 RDD간에 종속성을 만들 수 있습니다. 종속성 체인 (종속성 문자열)의 각 RDD에는 데이터를 계산하는 함수가 있으며 부모 RDD에 대한 포인터 (종속성)가 있습니다.

Spark는 게으 르기 때문에 작업 생성 및 실행을 트리거하는 일부 변환 또는 작업을 호출하지 않는 한 아무것도 실행되지 않습니다. 단어 개수 예제의 다음 스 니펫을보십시오.

따라서 RDD 변환은 데이터 집합이 아니라 Spark에 데이터를 가져 오는 방법과 데이터로 수행 할 작업을 알려주는 프로그램의 한 단계입니다 (유일한 단계 일 수 있음).

다음은 RDD 변환 목록입니다.

S. 아니 변환 및 의미
1

map(func)

함수를 통해 소스의 각 요소를 전달하여 형성된 새로운 분산 데이터 세트를 반환합니다. func.

2

filter(func)

소스의 해당 요소를 선택하여 형성된 새 데이터 세트를 반환합니다. func true를 반환합니다.

flatMap(func)

map과 유사하지만 각 입력 항목은 0 개 이상의 출력 항목에 매핑 될 수 있습니다 (따라서 func 는 단일 항목이 아닌 Seq를 반환해야 함).

4

mapPartitions(func)

map과 유사하지만 RDD의 각 파티션 (블록)에서 개별적으로 실행되므로 func 유형 T의 RDD에서 실행중인 경우 Iterator <T> ⇒ Iterator <U> 유형이어야합니다.

5

mapPartitionsWithIndex(func)

맵 파티션과 유사하지만 func 파티션의 인덱스를 나타내는 정수 값으로 func 유형 T의 RDD에서 실행중인 경우 (Int, Iterator <T>) ⇒ Iterator <U> 유형이어야합니다.

6

sample(withReplacement, fraction, seed)

샘플 A fraction 주어진 난수 생성기 시드를 사용하여 대체 여부에 관계없이 데이터의

7

union(otherDataset)

소스 데이터 세트의 요소와 인수의 합집합을 포함하는 새 데이터 세트를 반환합니다.

8

intersection(otherDataset)

소스 데이터 세트의 요소와 인수의 교차를 포함하는 새 RDD를 반환합니다.

9

distinct([numTasks])

소스 데이터 세트의 고유 한 요소가 포함 된 새 데이터 세트를 반환합니다.

10

groupByKey([numTasks])

(K, V) 쌍의 데이터 세트에서 호출되면 (K, Iterable <V>) 쌍의 데이터 세트를 반환합니다.

Note − 각 키에 대해 집계 (예 : 합계 또는 평균)를 수행하기 위해 그룹화하는 경우 reduceByKey 또는 aggregateByKey를 사용하면 훨씬 더 나은 성능을 얻을 수 있습니다.

11

reduceByKey(func, [numTasks])

(K, V) 쌍의 데이터 세트에서 호출되면 (K, V) 쌍의 데이터 세트를 반환합니다. 여기서 각 키의 값은 주어진 reduce 함수 func를 사용하여 집계되며 (V, V) ⇒ V 유형이어야합니다. groupByKey에서와 같이 축소 작업의 수는 선택적 두 번째 인수를 통해 구성 할 수 있습니다.

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

(K, V) 쌍의 데이터 세트에서 호출되면 주어진 결합 함수와 중립 "0"값을 사용하여 각 키의 값이 집계되는 (K, U) 쌍의 데이터 세트를 반환합니다. 불필요한 할당을 피하면서 입력 값 유형과 다른 집계 값 유형을 허용합니다. groupByKey와 마찬가지로 감소 작업의 수는 선택적 두 번째 인수를 통해 구성 할 수 있습니다.

13

sortByKey([ascending], [numTasks])

K가 Ordered를 구현하는 (K, V) 쌍의 데이터 세트에서 호출되면 부울 오름차순 인수에 지정된대로 키별로 오름차순 또는 내림차순으로 정렬 된 (K, V) 쌍의 데이터 세트를 반환합니다.

14

join(otherDataset, [numTasks])

(K, V) 및 (K, W) 유형의 데이터 세트에서 호출되면 각 키에 대한 모든 요소 쌍과 함께 (K, (V, W)) 쌍의 데이터 세트를 반환합니다. 외부 조인은 leftOuterJoin, rightOuterJoin 및 fullOuterJoin을 통해 지원됩니다.

15

cogroup(otherDataset, [numTasks])

(K, V) 및 (K, W) 유형의 데이터 세트에서 호출되면 (K, (Iterable <V>, Iterable <W>)) 튜플의 데이터 세트를 반환합니다. 이 작업을 그룹 포함이라고도합니다.

16

cartesian(otherDataset)

T 및 U 유형의 데이터 세트에서 호출되면 (T, U) 쌍 (모든 요소 쌍)의 데이터 세트를 반환합니다.

17

pipe(command, [envVars])

Perl 또는 bash 스크립트와 같은 쉘 명령을 통해 RDD의 각 파티션을 파이프하십시오. RDD 요소는 프로세스의 stdin에 기록되고 해당 stdout에 출력되는 라인은 문자열의 RDD로 반환됩니다.

18

coalesce(numPartitions)

RDD의 파티션 수를 numPartitions로 줄이십시오. 대규모 데이터 세트를 필터링 한 후 작업을보다 효율적으로 실행하는 데 유용합니다.

19

repartition(numPartitions)

RDD의 데이터를 무작위로 재편성하여 더 많거나 적은 파티션을 생성하고 이들간에 균형을 맞 춥니 다. 이것은 항상 네트워크를 통해 모든 데이터를 섞습니다.

20

repartitionAndSortWithinPartitions(partitioner)

주어진 파티 셔너에 따라 RDD를 다시 파티셔닝하고 각 결과 파티션 내에서 키별로 레코드를 정렬합니다. 이는 재분할을 호출 한 다음 각 파티션 내에서 정렬하는 것보다 효율적입니다. 정렬을 셔플 기계로 밀어 넣을 수 있기 때문입니다.

행위

다음 표는 값을 반환하는 작업 목록을 제공합니다.

S. 아니 행동 및 의미
1

reduce(func)

함수를 사용하여 데이터 세트의 요소 집계 func(두 개의 인수를 취하고 하나를 반환합니다). 함수는 병렬로 올바르게 계산 될 수 있도록 교환 및 연관이어야합니다.

2

collect()

드라이버 프로그램에서 데이터 셋의 모든 요소를 ​​배열로 반환합니다. 이는 일반적으로 충분히 작은 데이터 하위 집합을 반환하는 필터 또는 기타 작업 후에 유용합니다.

count()

데이터 세트의 요소 수를 반환합니다.

4

first()

데이터 세트의 첫 번째 요소를 반환합니다 (take (1)과 유사).

5

take(n)

첫 번째 배열을 반환합니다. n 데이터 세트의 요소.

6

takeSample (withReplacement,num, [seed])

무작위 샘플이있는 배열을 반환합니다. num 대체가 있거나없는 데이터 세트의 요소. 선택적으로 난수 생성기 시드를 미리 지정합니다.

7

takeOrdered(n, [ordering])

첫 번째를 반환합니다. n 자연 순서 또는 사용자 정의 비교기를 사용하여 RDD의 요소.

8

saveAsTextFile(path)

로컬 파일 시스템, HDFS 또는 기타 Hadoop 지원 파일 시스템의 지정된 디렉토리에 데이터 세트의 요소를 텍스트 파일 (또는 텍스트 파일 세트)로 씁니다. Spark는 각 요소에서 toString을 호출하여 파일의 텍스트 행으로 변환합니다.

9

saveAsSequenceFile(path) (Java and Scala)

데이터 세트의 요소를 로컬 파일 시스템, HDFS 또는 기타 Hadoop 지원 파일 시스템의 지정된 경로에 Hadoop SequenceFile로 씁니다. 이는 Hadoop의 쓰기 가능한 인터페이스를 구현하는 키-값 쌍의 RDD에서 사용할 수 있습니다. Scala에서는 암시 적으로 Writable로 변환 할 수있는 형식에서도 사용할 수 있습니다 (Spark에는 Int, Double, String 등과 같은 기본 형식에 대한 변환이 포함됨).

10

saveAsObjectFile(path) (Java and Scala)

Java 직렬화를 사용하여 데이터 세트의 요소를 간단한 형식으로 작성한 다음 SparkContext.objectFile ()을 사용하여로드 할 수 있습니다.

11

countByKey()

유형 (K, V)의 RDD에서만 사용할 수 있습니다. 각 키의 개수와 함께 (K, Int) 쌍의 해시 맵을 반환합니다.

12

foreach(func)

함수 실행 func데이터 세트의 각 요소에 대해 이는 일반적으로 Accumulator 업데이트 또는 외부 스토리지 시스템과의 상호 작용과 같은 부작용에 대해 수행됩니다.

Note− foreach () 외부에서 Accumulators 이외의 변수를 수정하면 정의되지 않은 동작이 발생할 수 있습니다. 자세한 내용은 폐쇄 이해를 참조하세요.

RDD로 프로그래밍

예제를 통해 RDD 프로그래밍에서 몇 가지 RDD 변환 및 작업의 구현을 살펴 보겠습니다.

단어 수의 예를 고려하십시오-문서에 나타나는 각 단어를 계산합니다. 다음 텍스트를 입력으로 고려하고input.txt 홈 디렉토리의 파일.

input.txt − 입력 파일.

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

아래의 절차에 따라 주어진 예제를 실행하십시오.

Spark-Shell 열기

다음 명령은 스파크 셸을 여는 데 사용됩니다. 일반적으로 Spark는 Scala를 사용하여 빌드됩니다. 따라서 Spark 프로그램은 Scala 환경에서 실행됩니다.

$ spark-shell

Spark 셸이 성공적으로 열리면 다음 출력을 찾을 수 있습니다. 출력 "Spark context available as sc"의 마지막 줄을 보면 Spark 컨테이너가 이름을 가진 Spark 컨텍스트 개체가 자동으로 생성됨을 의미합니다.sc. 프로그램의 첫 번째 단계를 시작하기 전에 SparkContext 개체를 만들어야합니다.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

RDD 생성

먼저 Spark-Scala API를 사용하여 입력 파일을 읽고 RDD를 만들어야합니다.

다음 명령은 지정된 위치에서 파일을 읽는 데 사용됩니다. 여기서는 inputfile의 이름으로 새로운 RDD가 생성됩니다. textFile (“”) 메서드에서 인수로 주어진 문자열은 입력 파일 이름의 절대 경로입니다. 그러나 파일 이름 만 제공되면 입력 파일이 현재 위치에 있음을 의미합니다.

scala> val inputfile = sc.textFile("input.txt")

단어 수 변환 실행

우리의 목표는 파일의 단어 수를 세는 것입니다. 각 줄을 단어로 분할하기위한 평면지도를 만듭니다 (flatMap(line ⇒ line.split(“ ”)).

다음으로 각 단어를 값이있는 키로 읽습니다. ‘1’ (<key, value> = <word, 1>)지도 기능 사용 (map(word ⇒ (word, 1)).

마지막으로 유사한 키의 값을 추가하여 해당 키를 줄입니다 (reduceByKey(_+_)).

다음 명령은 워드 카운트 로직을 실행하는 데 사용됩니다. 이 작업을 실행 한 후에는 작업이 아니기 때문에 출력을 찾을 수 없습니다. 이것은 변환입니다. 새로운 RDD를 가리 키거나 주어진 데이터로 무엇을해야하는지 스파크에게 지시)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

현재 RDD

RDD로 작업하는 동안 현재 RDD에 대해 알고 싶다면 다음 명령을 사용하십시오. 현재 RDD에 대한 설명과 디버깅을위한 종속성을 보여줍니다.

scala> counts.toDebugString

변환 캐싱

persist () 또는 cache () 메소드를 사용하여 RDD를 지속되도록 표시 할 수 있습니다. 작업에서 처음 계산 될 때 노드의 메모리에 보관됩니다. 다음 명령을 사용하여 중간 변환을 메모리에 저장합니다.

scala> counts.cache()

액션 적용

모든 변환 저장과 같은 동작을 적용하면 텍스트 파일이 생성됩니다. saveAsTextFile (“”) 메서드의 String 인수는 출력 폴더의 절대 경로입니다. 출력을 텍스트 파일로 저장하려면 다음 명령을 시도하십시오. 다음 예에서 'output'폴더는 현재 위치에 있습니다.

scala> counts.saveAsTextFile("output")

출력 확인

다른 터미널을 열어 홈 디렉터리로 이동합니다 (다른 터미널에서 스파크가 실행 됨). 출력 디렉토리를 확인하려면 다음 명령을 사용하십시오.

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

다음 명령은 다음에서 출력을 보는 데 사용됩니다. Part-00000 파일.

[hadoop@localhost output]$ cat part-00000

산출

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

다음 명령은 다음에서 출력을 보는 데 사용됩니다. Part-00001 파일.

[hadoop@localhost output]$ cat part-00001

산출

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

UN Persist the Storage

UN-persisting 전에이 애플리케이션에 사용되는 저장 공간을 확인하려면 브라우저에서 다음 URL을 사용하십시오.

http://localhost:4040

Spark 셸에서 실행중인 애플리케이션에 사용 된 저장 공간을 보여주는 다음 화면이 표시됩니다.

특정 RDD의 저장 공간을 유지 해제하려면 다음 명령을 사용하십시오.

Scala> counts.unpersist()

다음과 같이 출력이 표시됩니다.

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

브라우저의 저장 공간을 확인하려면 다음 URL을 사용하십시오.

http://localhost:4040/

다음 화면이 표시됩니다. Spark 셸에서 실행중인 애플리케이션에 사용 된 저장 공간을 보여줍니다.