PySpark-퀵 가이드

이 장에서 우리는 Apache Spark가 무엇이며 PySpark가 어떻게 개발되었는지 알게 될 것입니다.

Spark – 개요

Apache Spark는 번개처럼 빠른 실시간 처리 프레임 워크입니다. 실시간으로 데이터를 분석하기 위해 메모리 내 계산을 수행합니다. 그것은 그림으로 나왔습니다.Apache Hadoop MapReduce일괄 처리 만 수행하고 실시간 처리 기능이 없었습니다. 이에 아파치 스파크는 실시간 스트림 처리가 가능하고 일괄 처리도 가능하다는 점에서 도입됐다.

실시간 및 일괄 처리 외에도 Apache Spark는 대화 형 쿼리 및 반복 알고리즘도 지원합니다. Apache Spark에는 애플리케이션을 호스팅 할 수있는 자체 클러스터 관리자가 있습니다. 스토리지와 처리 모두에 Apache Hadoop을 활용합니다. 그것은 사용합니다HDFS (Hadoop 분산 파일 시스템) 스토리지 용으로 Spark 애플리케이션을 실행할 수 있습니다. YARN 게다가.

PySpark – 개요

Apache Spark는 Scala programming language. Spark로 Python을 지원하기 위해 Apache Spark Community는 도구 인 PySpark를 출시했습니다. PySpark를 사용하면RDDs파이썬 프로그래밍 언어에서도. 라는 도서관 때문입니다.Py4j 이를 달성 할 수 있습니다.

PySpark는 PySpark ShellPython API를 Spark 코어에 연결하고 Spark 컨텍스트를 초기화합니다. 오늘날 대다수의 데이터 과학자와 분석 전문가는 풍부한 라이브러리 세트로 인해 Python을 사용합니다. Python을 Spark와 통합하는 것은 그들에게 유익합니다.

이 장에서는 PySpark의 환경 설정을 이해합니다.

Note − 컴퓨터에 Java와 Scala가 설치되어있는 것으로 간주합니다.

이제 다음 단계에 따라 PySpark를 다운로드하고 설정하겠습니다.

Step 1− 공식 Apache Spark 다운로드 페이지로 이동 하여 최신 버전의 Apache Spark를 다운로드 하십시오. 이 튜토리얼에서 우리는spark-2.1.0-bin-hadoop2.7.

Step 2− 이제 다운로드 한 Spark tar 파일을 추출합니다. 기본적으로 다운로드 디렉토리에 다운로드됩니다.

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

디렉토리를 생성합니다. spark-2.1.0-bin-hadoop2.7. PySpark를 시작하기 전에 다음 환경을 설정하여 Spark 경로 및Py4j path.

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

또는 위의 환경을 전역 적으로 설정하려면 .bashrc file. 그런 다음 환경이 작동하려면 다음 명령을 실행하십시오.

# source .bashrc

이제 모든 환경이 설정되었으므로 Spark 디렉터리로 이동하여 다음 명령을 실행하여 PySpark 셸을 호출합니다.

# ./bin/pyspark

그러면 PySpark 셸이 시작됩니다.

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

SparkContext는 모든 스파크 기능의 진입 점입니다. Spark 애플리케이션을 실행하면 드라이버 프로그램이 시작됩니다.이 프로그램은 주요 기능을 포함하고 여기에서 SparkContext가 시작됩니다. 그런 다음 드라이버 프로그램은 작업자 노드의 실행기 내부에서 작업을 실행합니다.

SparkContext는 Py4J를 사용하여 JVM 그리고 생성 JavaSparkContext. 기본적으로 PySpark에는 다음과 같이 사용 가능한 SparkContext가 있습니다.‘sc’, 따라서 새 SparkContext 생성이 작동하지 않습니다.

다음 코드 블록에는 SparkContext가 취할 수있는 PySpark 클래스 및 매개 변수의 세부 사항이 있습니다.

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

매개 변수

다음은 SparkContext의 매개 변수입니다.

  • Master − 연결되는 클러스터의 URL입니다.

  • appName − 직업 이름.

  • sparkHome − Spark 설치 디렉토리.

  • pyFiles − 클러스터로 전송하고 PYTHONPATH에 추가 할 .zip 또는 .py 파일.

  • Environment − 작업자 노드 환경 변수.

  • batchSize− 단일 Java 객체로 표현되는 Python 객체의 수. 일괄 처리를 사용하지 않으려면 1을 설정하고, 개체 크기에 따라 일괄 처리 크기를 자동으로 선택하려면 0을 설정하고, 무제한 일괄 처리 크기를 사용하려면 -1을 설정합니다.

  • Serializer − RDD 시리얼 라이저.

  • Conf − 모든 Spark 속성을 설정하기위한 L {SparkConf}의 객체.

  • Gateway − 기존 게이트웨이와 JVM을 사용하고 그렇지 않으면 새 JVM을 초기화합니다.

  • JSC − JavaSparkContext 인스턴스.

  • profiler_cls − 프로파일 링을 수행하는 데 사용되는 사용자 정의 프로파일 러의 클래스 (기본값은 pyspark.profiler.BasicProfiler).

위의 매개 변수 중 masterappname주로 사용됩니다. PySpark 프로그램의 처음 두 줄은 다음과 같습니다.

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContext 예제 – PySpark 셸

이제 SparkContext에 대해 충분히 알았으므로 PySpark 셸에서 간단한 예제를 실행 해 보겠습니다. 이 예에서는 'a'또는 'b'문자가있는 줄 수를 계산합니다.README.md파일. 따라서 파일에 5 줄이 있고 3 줄에 'a'문자가 있으면 출력은 다음과 같습니다. →Line with a: 3. 문자 'b'도 마찬가지입니다.

Note− 다음 예에서는 SparkContext 객체를 생성하지 않습니다. 기본적으로 Spark는 PySpark 셸이 시작될 때 sc라는 SparkContext 객체를 자동으로 생성하기 때문입니다. 다른 SparkContext 개체를 만들려고하면 다음 오류가 발생합니다."ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContext 예제-Python 프로그램

Python 프로그램을 사용하여 동일한 예제를 실행 해 보겠습니다. 라는 Python 파일을 만듭니다.firstapp.py 해당 파일에 다음 코드를 입력하십시오.

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

그런 다음 터미널에서 다음 명령을 실행하여이 Python 파일을 실행합니다. 위와 동일한 출력을 얻을 수 있습니다.

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

이제 시스템에 PySpark를 설치하고 구성 했으므로 Apache Spark에서 Python으로 프로그래밍 할 수 있습니다. 그러나 그렇게하기 전에 Spark-RDD의 기본 개념을 이해하겠습니다.

RDD는 Resilient Distributed Dataset, 이들은 클러스터에서 병렬 처리를 수행하기 위해 여러 노드에서 실행 및 작동하는 요소입니다. RDD는 변경 불가능한 요소이므로 RDD를 생성 한 후에는 변경할 수 없습니다. RDD도 내결함성이 있으므로 오류가 발생하면 자동으로 복구됩니다. 이러한 RDD에 여러 작업을 적용하여 특정 작업을 수행 할 수 있습니다.

이 RDD에 작업을 적용하려면 두 가지 방법이 있습니다.

  • 변환 및
  • Action

이 두 가지 방법을 자세히 이해합시다.

Transformation− 새로운 RDD를 생성하기 위해 RDD에 적용되는 작업입니다. Filter, groupBy 및 map은 변환의 예입니다.

Action − 이는 RDD에 적용되는 작업으로, Spark에 계산을 수행하고 결과를 드라이버로 다시 보내도록 지시합니다.

PySpark에서 작업을 적용하려면 PySpark RDD먼저. 다음 코드 블록은 PySpark RDD 클래스의 세부 사항을 가지고 있습니다-

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

PySpark를 사용하여 몇 가지 기본 작업을 실행하는 방법을 살펴 보겠습니다. Python 파일의 다음 코드는 언급 된 단어 집합을 저장하는 RDD 단어를 만듭니다.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

이제 단어에 대한 몇 가지 작업을 실행합니다.

카운트()

RDD의 요소 수가 반환됩니다.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command − count () 명령은 −

$SPARK_HOME/bin/spark-submit count.py

Output − 위 명령의 출력은 −

Number of elements in RDD → 8

수집()

RDD의 모든 요소가 반환됩니다.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command − collect () 명령은 −

$SPARK_HOME/bin/spark-submit collect.py

Output − 위 명령의 출력은 −

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (f)

foreach 내부 함수의 조건을 충족하는 요소 만 반환합니다. 다음 예에서는 RDD의 모든 요소를 ​​인쇄하는 foreach의 인쇄 함수를 호출합니다.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command − foreach (f)에 대한 명령은 −

$SPARK_HOME/bin/spark-submit foreach.py

Output − 위 명령의 출력은 −

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

필터 (f)

필터 내부의 기능을 충족하는 요소를 포함하는 새 RDD가 반환됩니다. 다음 예에서는 ''spark "가 포함 된 문자열을 필터링합니다.

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command − filter (f) 명령은 −

$SPARK_HOME/bin/spark-submit filter.py

Output − 위 명령의 출력은 −

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map (f, preservesPartitioning = False)

RDD의 각 요소에 함수를 적용하면 새 RDD가 반환됩니다. 다음 예에서는 키 값 쌍을 형성하고 모든 문자열을 값 1로 매핑합니다.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command − map (f, preservesPartitioning = False) 명령은 −

$SPARK_HOME/bin/spark-submit map.py

Output − 위 명령의 출력은 −

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

감소 (f)

지정된 교환 및 연관 이진 연산을 수행 한 후 RDD의 요소가 반환됩니다. 다음 예제에서는 연산자에서 패키지 추가를 가져 와서 'num'에 적용하여 간단한 추가 작업을 수행합니다.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command − reduce (f) 명령은 −

$SPARK_HOME/bin/spark-submit reduce.py

Output − 위 명령의 출력은 −

Adding all the elements -> 15

join (기타, numPartitions = 없음)

일치하는 키가있는 요소 쌍과 해당 특정 키에 대한 모든 값이있는 RDD를 반환합니다. 다음 예에서는 두 개의 서로 다른 RDD에 두 쌍의 요소가 있습니다. 이 두 RDD를 결합한 후 일치하는 키와 해당 값을 가진 요소가있는 RDD를 얻습니다.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command − join (other, numPartitions = None) 명령은 −

$SPARK_HOME/bin/spark-submit join.py

Output − 위 명령의 출력은 −

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

은닉처()

기본 저장소 수준 (MEMORY_ONLY)으로이 RDD를 유지합니다. RDD가 캐시되었는지 여부도 확인할 수 있습니다.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command − cache () 명령은 −

$SPARK_HOME/bin/spark-submit cache.py

Output − 위 프로그램의 출력은 −

Words got cached -> True

이들은 PySpark RDD에서 수행되는 가장 중요한 작업 중 일부입니다.

병렬 처리를 위해 Apache Spark는 공유 변수를 사용합니다. 공유 변수의 복사본은 드라이버가 클러스터의 실행자에게 작업을 보낼 때 클러스터의 각 노드로 이동하므로 작업을 수행하는 데 사용할 수 있습니다.

Apache Spark에서 지원하는 두 가지 유형의 공유 변수가 있습니다.

  • Broadcast
  • Accumulator

자세히 이해합시다.

방송

브로드 캐스트 변수는 모든 노드에서 데이터 사본을 저장하는 데 사용됩니다. 이 변수는 모든 컴퓨터에 캐시되며 작업이있는 컴퓨터에는 전송되지 않습니다. 다음 코드 블록에는 PySpark에 대한 Broadcast 클래스의 세부 정보가 있습니다.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

다음 예제는 Broadcast 변수를 사용하는 방법을 보여줍니다. Broadcast 변수에는 데이터를 저장하고 브로드 캐스트 된 값을 반환하는 데 사용되는 value라는 속성이 있습니다.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command − 브로드 캐스트 변수에 대한 명령은 다음과 같습니다. −

$SPARK_HOME/bin/spark-submit broadcast.py

Output − 다음 명령의 출력은 다음과 같습니다.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

어큐뮬레이터

누산기 변수는 연관 및 교환 연산을 통해 정보를 집계하는 데 사용됩니다. 예를 들어 합계 연산 또는 카운터 (MapReduce에서)에 누산기를 사용할 수 있습니다. 다음 코드 블록에는 PySpark 용 Accumulator 클래스의 세부 정보가 있습니다.

class pyspark.Accumulator(aid, value, accum_param)

다음 예제는 Accumulator 변수를 사용하는 방법을 보여줍니다. Accumulator 변수에는 브로드 캐스트 변수와 유사한 value라는 속성이 있습니다. 데이터를 저장하고 누산기 값을 반환하는 데 사용되지만 드라이버 프로그램에서만 사용할 수 있습니다.

이 예제에서 accumulator 변수는 여러 작업자가 사용하고 누적 된 값을 반환합니다.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command − 누산기 변수에 대한 명령은 다음과 같습니다. −

$SPARK_HOME/bin/spark-submit accumulator.py

Output − 위 명령의 출력은 다음과 같습니다.

Accumulated value is -> 150

로컬 / 클러스터에서 Spark 애플리케이션을 실행하려면 몇 가지 구성 및 매개 변수를 설정해야합니다. 이것이 SparkConf가 도움이되는 것입니다. Spark 애플리케이션을 실행하기위한 구성을 제공합니다. 다음 코드 블록에는 PySpark 용 SparkConf 클래스의 세부 정보가 있습니다.

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

처음에는 SparkConf ()를 사용하여 SparkConf 객체를 생성하여 다음에서 값을로드합니다. spark.*자바 시스템 속성도 마찬가지입니다. 이제 SparkConf 개체를 사용하여 다른 매개 변수를 설정할 수 있으며 해당 매개 변수가 시스템 속성보다 우선합니다.

SparkConf 클래스에는 연결을 지원하는 setter 메서드가 있습니다. 예를 들어 다음과 같이 작성할 수 있습니다.conf.setAppName(“PySpark App”).setMaster(“local”). SparkConf 객체를 Apache Spark에 전달하면 어떤 사용자도 수정할 수 없습니다.

다음은 가장 일반적으로 사용되는 SparkConf 속성 중 일부입니다.

  • set(key, value) − 구성 속성을 설정합니다.

  • setMaster(value) − 마스터 URL을 설정합니다.

  • setAppName(value) − 애플리케이션 이름을 설정합니다.

  • get(key, defaultValue=None) − 키의 구성 값을 얻으려면.

  • setSparkHome(value) − 작업자 노드에 Spark 설치 경로를 설정합니다.

PySpark 프로그램에서 SparkConf를 사용하는 다음 예제를 고려해 보겠습니다. 이 예에서는 스파크 애플리케이션 이름을 다음과 같이 설정합니다.PySpark App 스파크 애플리케이션의 마스터 URL을 → spark://master:7077.

다음 코드 블록에는 Python 파일에 추가 될 때 PySpark 애플리케이션을 실행하기위한 기본 구성이 설정되는 줄이 있습니다.

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

Apache Spark에서 다음을 사용하여 파일을 업로드 할 수 있습니다. sc.addFile (sc는 기본 SparkContext 임) 다음을 사용하여 작업자의 경로를 가져옵니다. SparkFiles.get. 따라서 SparkFiles는 다음을 통해 추가 된 파일의 경로를 확인합니다.SparkContext.addFile().

SparkFiles에는 다음 클래스 메서드가 포함되어 있습니다.

  • get(filename)
  • getrootdirectory()

자세히 이해합시다.

get (파일 이름)

SparkContext.addFile ()을 통해 추가되는 파일의 경로를 지정합니다.

getrootdirectory ()

SparkContext.addFile ()을 통해 추가 된 파일이 포함 된 루트 디렉터리의 경로를 지정합니다.

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command − 명령은 다음과 같습니다 −

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output − 위 명령의 출력은 −

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

StorageLevel은 RDD를 저장하는 방법을 결정합니다. Apache Spark에서 StorageLevel은 RDD를 메모리에 저장할 것인지 디스크에 저장할 것인지 또는 둘 다에 저장할 것인지 결정합니다. 또한 RDD를 직렬화할지 여부와 RDD 파티션을 복제할지 여부도 결정합니다.

다음 코드 블록은 StorageLevel의 클래스 정의를 가지고 있습니다-

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

이제 RDD의 저장소를 결정하기 위해 다음과 같은 다양한 저장소 수준이 있습니다.

  • DISK_ONLY = StorageLevel (True, False, False, False, 1)

  • DISK_ONLY_2 = StorageLevel (True, False, False, False, 2)

  • MEMORY_AND_DISK = StorageLevel (True, True, False, False, 1)

  • MEMORY_AND_DISK_2 = StorageLevel (True, True, False, False, 2)

  • MEMORY_AND_DISK_SER = StorageLevel (True, True, False, False, 1)

  • MEMORY_AND_DISK_SER_2 = StorageLevel (True, True, False, False, 2)

  • MEMORY_ONLY = StorageLevel (False, True, False, False, 1)

  • MEMORY_ONLY_2 = StorageLevel (False, True, False, False, 2)

  • MEMORY_ONLY_SER = StorageLevel (False, True, False, False, 1)

  • MEMORY_ONLY_SER_2 = StorageLevel (False, True, False, False, 2)

  • OFF_HEAP = StorageLevel (True, True, True, False, 1)

스토리지 레벨을 사용하는 다음 StorageLevel 예제를 고려해 보겠습니다. MEMORY_AND_DISK_2, 즉, RDD 파티션의 복제는 2입니다.

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command − 명령은 다음과 같습니다 −

$SPARK_HOME/bin/spark-submit storagelevel.py

Output − 위 명령의 출력은 다음과 같습니다. −

Disk Memory Serialized 2x Replicated

Apache Spark는 다음과 같은 Machine Learning API를 제공합니다. MLlib. PySpark는 Python으로도이 기계 학습 API를 가지고 있습니다. 아래에 언급 된 다른 종류의 알고리즘을 지원합니다.

  • mllib.classificationspark.mllib패키지는 이진 분류, 다중 클래스 분류 및 회귀 분석을위한 다양한 방법을 지원합니다. 분류에서 가장 많이 사용되는 알고리즘은 다음과 같습니다.Random Forest, Naive Bayes, Decision Tree

  • mllib.clustering − 클러스터링은 비지도 학습 문제로, 유사성 개념을 기반으로 엔티티의 하위 집합을 서로 그룹화하는 것을 목표로합니다.

  • mllib.fpm− 빈번한 패턴 매칭은 일반적으로 대규모 데이터 세트를 분석하는 첫 번째 단계에 포함되는 빈번한 항목, 항목 집합, 하위 시퀀스 또는 기타 하위 구조를 마이닝하는 것입니다. 이것은 수년간 데이터 마이닝에서 활발한 연구 주제였습니다.

  • mllib.linalg − 선형 대수를위한 MLlib 유틸리티.

  • mllib.recommendation− 협업 필터링은 일반적으로 추천 시스템에 사용됩니다. 이러한 기술은 사용자 항목 연관 매트릭스의 누락 된 항목을 채우는 것을 목표로합니다.

  • spark.mllib− 현재 모델 기반 협업 필터링을 지원합니다. 여기서 사용자와 제품은 누락 된 항목을 예측하는 데 사용할 수있는 작은 잠복 요소 집합으로 설명됩니다. spark.mllib는 이러한 잠재 요인을 학습하기 위해 대체 최소 제곱 (ALS) 알고리즘을 사용합니다.

  • mllib.regression− 선형 회귀는 회귀 알고리즘 계열에 속합니다. 회귀의 목표는 변수 간의 관계와 종속성을 찾는 것입니다. 선형 회귀 모델 및 모델 요약 작업을위한 인터페이스는 로지스틱 회귀 사례와 유사합니다.

mllib 패키지의 일부로 다른 알고리즘, 클래스 및 함수도 있습니다. 지금부터 데모를 이해하겠습니다.pyspark.mllib.

다음 예는 ALS 알고리즘을 사용하여 추천 모델을 구축하고 학습 데이터에서 평가하는 협업 필터링의 예입니다.

Dataset used − test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command − 명령은 다음과 같습니다 −

$SPARK_HOME/bin/spark-submit recommend.py

Output − 위 명령의 출력은 −

Mean Squared Error = 1.20536041839e-05

직렬화는 Apache Spark의 성능 조정에 사용됩니다. 네트워크를 통해 전송되거나 디스크에 기록되거나 메모리에 유지되는 모든 데이터는 직렬화되어야합니다. 직렬화는 비용이 많이 드는 작업에서 중요한 역할을합니다.

PySpark는 성능 조정을위한 사용자 지정 직렬 변환기를 지원합니다. 다음 두 시리얼 라이저는 PySpark에서 지원합니다.

MarshalSerializer

Python의 Marshal Serializer를 사용하여 객체를 직렬화합니다. 이 직렬 변환기는 PickleSerializer보다 빠르지 만 더 적은 데이터 유형을 지원합니다.

class pyspark.MarshalSerializer

PickleSerializer

Python의 Pickle Serializer를 사용하여 객체를 직렬화합니다. 이 직렬 변환기는 거의 모든 Python 객체를 지원하지만 더 전문화 된 직렬 변환기만큼 빠르지 않을 수 있습니다.

class pyspark.PickleSerializer

PySpark 직렬화에 대한 예를 살펴 보겠습니다. 여기서는 MarshalSerializer를 사용하여 데이터를 직렬화합니다.

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command − 명령은 다음과 같습니다 −

$SPARK_HOME/bin/spark-submit serializing.py

Output − 위 명령의 출력은 −

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]