PySpark-クイックガイド

この章では、Apache Sparkとは何か、PySparkがどのように開発されたかを理解します。

Spark –概要

Apache Sparkは、超高速のリアルタイム処理フレームワークです。インメモリ計算を実行して、データをリアルタイムで分析します。それはとして絵になりましたApache Hadoop MapReduceバッチ処理のみを実行していて、リアルタイム処理機能がありませんでした。そのため、リアルタイムでストリーム処理を実行でき、バッチ処理も処理できるApacheSparkが導入されました。

リアルタイムおよびバッチ処理とは別に、ApacheSparkはインタラクティブクエリと反復アルゴリズムもサポートしています。Apache Sparkには、アプリケーションをホストできる独自のクラスターマネージャーがあります。ストレージと処理の両方にApacheHadoopを活用します。それは使用していますHDFS (Hadoop分散ファイルシステム)ストレージ用で、Sparkアプリケーションを実行できます YARN 同様に。

PySpark –概要

ApacheSparkはで書かれています Scala programming language。SparkでPythonをサポートするために、Apache SparkCommunityはツールPySparkをリリースしました。PySparkを使用すると、RDDsPythonプログラミング言語でも。それはと呼ばれる図書館のためですPy4j 彼らがこれを達成することができること。

PySparkは提供しています PySpark ShellPython APIをsparkコアにリンクし、Sparkコンテキストを初期化します。今日、データサイエンティストと分析の専門家の大多数は、豊富なライブラリセットのためにPythonを使用しています。PythonをSparkと統合することは、彼らにとって恩恵です。

この章では、PySparkの環境設定について理解します。

Note −これは、JavaとScalaがコンピューターにインストールされていることを考慮しています。

次の手順でPySparkをダウンロードしてセットアップしましょう。

Step 1−公式のApache Sparkダウンロードページに移動し、そこで入手可能な最新バージョンのApacheSparkをダウンロードします。このチュートリアルでは、spark-2.1.0-bin-hadoop2.7

Step 2−次に、ダウンロードしたSparktarファイルを抽出します。デフォルトでは、ダウンロードディレクトリにダウンロードされます。

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

ディレクトリを作成します spark-2.1.0-bin-hadoop2.7。PySparkを開始する前に、次の環境を設定してSparkパスとPy4j path

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

または、上記の環境をグローバルに設定するには、それらを .bashrc file。次に、環境が機能するように次のコマンドを実行します。

# source .bashrc

すべての環境が設定されたので、Sparkディレクトリに移動し、次のコマンドを実行してPySparkシェルを呼び出します。

# ./bin/pyspark

これにより、PySparkシェルが起動します。

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

SparkContextは、Spark機能へのエントリポイントです。Sparkアプリケーションを実行すると、main関数を持つドライバープログラムが起動し、SparkContextがここで開始されます。次に、ドライバープログラムは、ワーカーノードのエグゼキューター内で操作を実行します。

SparkContextはPy4Jを使用して起動します JVM を作成します JavaSparkContext。デフォルトでは、PySparkにはSparkContextがあります。‘sc’そのため、新しいSparkContextの作成は機能しません。

次のコードブロックには、SparkContextが取ることができるPySparkクラスとパラメーターの詳細が含まれています。

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

パラメーター

以下は、SparkContextのパラメーターです。

  • Master −接続先のクラスターのURLです。

  • appName −あなたの仕事の名前。

  • sparkHome −Sparkインストールディレクトリ。

  • pyFiles −クラスターに送信してPYTHONPATHに追加する.zipまたは.pyファイル。

  • Environment −ワーカーノードの環境変数。

  • batchSize−単一のJavaオブジェクトとして表されるPythonオブジェクトの数。バッチ処理を無効にするには1を設定し、オブジェクトサイズに基づいてバッチサイズを自動的に選択するには0を設定し、無制限のバッチサイズを使用するには-1を設定します。

  • Serializer −RDDシリアライザー。

  • Conf −すべてのSparkプロパティを設定するためのL {SparkConf}のオブジェクト。

  • Gateway −既存のゲートウェイとJVMを使用します。それ以外の場合は、新しいJVMを初期化します。

  • JSC −JavaSparkContextインスタンス。

  • profiler_cls −プロファイリングを行うために使用されるカスタムプロファイラーのクラス(デフォルトはpyspark.profiler.BasicProfilerです)。

上記のパラメータの中で、 master そして appname主に使用されます。PySparkプログラムの最初の2行は、次のようになります。

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContextの例–PySparkシェル

SparkContextについて十分に理解したところで、PySparkシェルで簡単な例を実行してみましょう。この例では、文字「a」または「b」を含む行数をカウントします。README.mdファイル。したがって、ファイルに5行あり、3行に文字「a」が含まれている場合、出力は→になります。Line with a: 3。文字「b」についても同様です。

Note− PySparkシェルの起動時に、デフォルトでSparkがscという名前のSparkContextオブジェクトを自動的に作成するため、次の例ではSparkContextオブジェクトを作成していません。別のSparkContextオブジェクトを作成しようとすると、次のエラーが発生します–"ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContextの例-Pythonプログラム

Pythonプログラムを使用して同じ例を実行してみましょう。と呼ばれるPythonファイルを作成しますfirstapp.py そのファイルに次のコードを入力します。

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

次に、ターミナルで次のコマンドを実行して、このPythonファイルを実行します。上記と同じ出力が得られます。

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

システムにPySparkをインストールして構成したので、ApacheSparkでPythonでプログラミングできます。ただし、その前に、Sparkの基本的な概念であるRDDについて理解しましょう。

RDDはの略です Resilient Distributed Dataset、これらは、クラスター上で並列処理を実行するために複数のノードで実行および操作される要素です。RDDは不変の要素です。つまり、RDDを作成すると、それを変更することはできません。RDDはフォールトトレラントでもあるため、障害が発生した場合は自動的に回復します。これらのRDDに複数の操作を適用して、特定のタスクを実行できます。

これらのRDDに操作を適用するには、2つの方法があります-

  • 変革と
  • Action

これら2つの方法を詳しく理解しましょう。

Transformation−これらは、新しいRDDを作成するためにRDDに適用される操作です。Filter、groupBy、mapは変換の例です。

Action −これらは、RDDに適用される操作であり、Sparkに計算を実行し、結果をドライバーに送り返すように指示します。

PySparkで操作を適用するには、を作成する必要があります PySpark RDD最初。次のコードブロックには、PySparkRDDクラスの詳細が含まれています-

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

PySparkを使用していくつかの基本的な操作を実行する方法を見てみましょう。Pythonファイルの次のコードは、言及された単語のセットを格納するRDD単語を作成します。

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

次に、単語に対していくつかの操作を実行します。

カウント()

RDDの要素数が返されます。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command − count()のコマンドは−です。

$SPARK_HOME/bin/spark-submit count.py

Output −上記のコマンドの出力は−です。

Number of elements in RDD → 8

collect()

RDD内のすべての要素が返されます。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command − collect()のコマンドは−です。

$SPARK_HOME/bin/spark-submit collect.py

Output −上記のコマンドの出力は−です。

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach(f)

foreach内の関数の条件を満たす要素のみを返します。次の例では、foreachでprint関数を呼び出します。この関数は、RDD内のすべての要素を出力します。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command − foreach(f)のコマンドは−です。

$SPARK_HOME/bin/spark-submit foreach.py

Output −上記のコマンドの出力は−です。

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

フィルタ内の関数を満たす要素を含む新しいRDDが返されます。次の例では、「spark」を含む文字列を除外します。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command − filter(f)のコマンドは−です。

$SPARK_HOME/bin/spark-submit filter.py

Output −上記のコマンドの出力は−です。

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map(f、preservesPartitioning = False)

RDDの各要素に関数を適用すると、新しいRDDが返されます。次の例では、キーと値のペアを形成し、すべての文字列を値1でマップします。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command − map(f、preservesPartitioning = False)のコマンドは−です。

$SPARK_HOME/bin/spark-submit map.py

Output −上記のコマンドの出力は−です。

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reduce(f)

指定された可換および連想二項演算を実行した後、RDDの要素が返されます。次の例では、演算子からaddパッケージをインポートし、それを「num」に適用して、単純な加算操作を実行しています。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command − reduce(f)のコマンドは−です。

$SPARK_HOME/bin/spark-submit reduce.py

Output −上記のコマンドの出力は−です。

Adding all the elements -> 15

join(other、numPartitions = None)

一致するキーとその特定のキーのすべての値を持つ要素のペアを含むRDDを返します。次の例では、2つの異なるRDDに2組の要素があります。これらの2つのRDDを結合した後、一致するキーとその値を持つ要素を持つRDDを取得します。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command − join(other、numPartitions = None)のコマンドは−です。

$SPARK_HOME/bin/spark-submit join.py

Output −上記のコマンドの出力は−です。

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

キャッシュ()

このRDDをデフォルトのストレージレベル(MEMORY_ONLY)で永続化します。RDDがキャッシュされているかどうかを確認することもできます。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command − cache()のコマンドは−です。

$SPARK_HOME/bin/spark-submit cache.py

Output −上記のプログラムの出力は−です。

Words got cached -> True

これらは、PySparkRDDで実行される最も重要な操作の一部でした。

並列処理の場合、ApacheSparkは共有変数を使用します。共有変数のコピーは、ドライバーがクラスター上のエグゼキューターにタスクを送信するときにクラスターの各ノードに送られるため、タスクの実行に使用できます。

ApacheSparkでサポートされている共有変数には2つのタイプがあります-

  • Broadcast
  • Accumulator

それらを詳しく理解しましょう。

放送

ブロードキャスト変数は、すべてのノード間でデータのコピーを保存するために使用されます。この変数はすべてのマシンにキャッシュされ、タスクのあるマシンには送信されません。次のコードブロックには、PySparkのBroadcastクラスの詳細が含まれています。

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

次の例は、Broadcast変数の使用方法を示しています。Broadcast変数には、valueという属性があります。この属性は、データを格納し、ブロードキャストされた値を返すために使用されます。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command −ブロードキャスト変数のコマンドは次のとおりです−

$SPARK_HOME/bin/spark-submit broadcast.py

Output −次のコマンドの出力を以下に示します。

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

アキュムレータ

アキュムレータ変数は、結合法則と可換法則を介して情報を集約するために使用されます。たとえば、(MapReduce内の)合計演算またはカウンターにアキュムレーターを使用できます。次のコードブロックには、PySparkのアキュムレータクラスの詳細が含まれています。

class pyspark.Accumulator(aid, value, accum_param)

次の例は、アキュムレータ変数の使用方法を示しています。アキュムレータ変数には、ブロードキャスト変数と同様の値と呼ばれる属性があります。データを格納し、アキュムレータの値を返すために使用されますが、ドライバプログラムでのみ使用できます。

この例では、アキュムレータ変数が複数のワーカーによって使用され、累積値を返します。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command −アキュムレータ変数のコマンドは次のとおりです。

$SPARK_HOME/bin/spark-submit accumulator.py

Output −上記のコマンドの出力を以下に示します。

Accumulated value is -> 150

ローカル/クラスターでSparkアプリケーションを実行するには、いくつかの構成とパラメーターを設定する必要があります。これは、SparkConfが役立つものです。Sparkアプリケーションを実行するための構成を提供します。次のコードブロックには、PySparkのSparkConfクラスの詳細が含まれています。

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

最初に、SparkConf()を使用してSparkConfオブジェクトを作成します。これにより、 spark.*Javaシステムのプロパティも同様です。これで、SparkConfオブジェクトを使用してさまざまなパラメーターを設定でき、それらのパラメーターはシステムプロパティよりも優先されます。

SparkConfクラスには、連鎖をサポートするセッターメソッドがあります。たとえば、あなたは書くことができますconf.setAppName(“PySpark App”).setMaster(“local”)。SparkConfオブジェクトをApacheSparkに渡すと、どのユーザーも変更できなくなります。

以下は、SparkConfの最も一般的に使用される属性の一部です。

  • set(key, value) −構成プロパティを設定します。

  • setMaster(value) −マスターURLを設定します。

  • setAppName(value) −アプリケーション名を設定します。

  • get(key, defaultValue=None) −キーの構成値を取得します。

  • setSparkHome(value) −ワーカーノードにSparkインストールパスを設定します。

PySparkプログラムでSparkConfを使用する次の例を考えてみましょう。この例では、sparkアプリケーション名を次のように設定しています。PySpark App SparkアプリケーションのマスターURLを→に設定します spark://master:7077

次のコードブロックには行があり、Pythonファイルに追加されると、PySparkアプリケーションを実行するための基本的な構成が設定されます。

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

Apache Sparkでは、を使用してファイルをアップロードできます sc.addFile (scはデフォルトのSparkContextです)を使用してワーカーのパスを取得します SparkFiles.get。したがって、SparkFilesは、を介して追加されたファイルへのパスを解決しますSparkContext.addFile()

SparkFilesには、次のクラスメソッドが含まれています-

  • get(filename)
  • getrootdirectory()

それらを詳しく理解しましょう。

get(ファイル名)

SparkContext.addFile()を介して追加されるファイルのパスを指定します。

getrootdirectory()

これは、SparkContext.addFile()を介して追加されるファイルを含むルートディレクトリへのパスを指定します。

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command −コマンドは次のとおりです−

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output −上記のコマンドの出力は−です。

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

StorageLevelは、RDDの保存方法を決定します。Apache Sparkでは、StorageLevelは、RDDをメモリに保存するか、ディスクに保存するか、またはその両方を行うかを決定します。また、RDDをシリアル化するかどうか、およびRDDパーティションを複製するかどうかも決定します。

次のコードブロックには、StorageLevelのクラス定義があります-

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

ここで、RDDのストレージを決定するために、以下に示すさまざまなストレージレベルがあります。

  • DISK_ONLY = StorageLevel(True、False、False、False、1)

  • DISK_ONLY_2 = StorageLevel(True、False、False、False、2)

  • MEMORY_AND_DISK = StorageLevel(True、True、False、False、1)

  • MEMORY_AND_DISK_2 = StorageLevel(True、True、False、False、2)

  • MEMORY_AND_DISK_SER = StorageLevel(True、True、False、False、1)

  • MEMORY_AND_DISK_SER_2 = StorageLevel(True、True、False、False、2)

  • MEMORY_ONLY = StorageLevel(False、True、False、False、1)

  • MEMORY_ONLY_2 = StorageLevel(False、True、False、False、2)

  • MEMORY_ONLY_SER = StorageLevel(False、True、False、False、1)

  • MEMORY_ONLY_SER_2 = StorageLevel(False、True、False、False、2)

  • OFF_HEAP = StorageLevel(True、True、True、False、1)

ストレージレベルを使用する次のStorageLevelの例を考えてみましょう。 MEMORY_AND_DISK_2, これは、RDDパーティションのレプリケーションが2になることを意味します。

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command −コマンドは次のとおりです−

$SPARK_HOME/bin/spark-submit storagelevel.py

Output −上記のコマンドの出力を以下に示します。

Disk Memory Serialized 2x Replicated

Apache Sparkは、と呼ばれる機械学習APIを提供します MLlib。PySparkには、Pythonにもこの機械学習APIがあります。以下に説明するさまざまな種類のアルゴリズムをサポートします-

  • mllib.classificationspark.mllibパッケージは、二項分類、多クラス分類、回帰分析のさまざまな方法をサポートしています。分類で最も人気のあるアルゴリズムのいくつかは次のとおりです。Random Forest, Naive Bayes, Decision Tree、など。

  • mllib.clustering −クラスタリングは教師なし学習の問題であり、類似性の概念に基づいてエンティティのサブセットを相互にグループ化することを目的としています。

  • mllib.fpm−頻繁なパターンマッチングとは、大規模なデータセットを分析するための最初のステップの1つである、頻繁なアイテム、アイテムセット、サブシーケンス、またはその他の下位構造をマイニングすることです。これは、何年もの間、データマイニングで活発な研究トピックとなっています。

  • mllib.linalg −線形代数用のMLlibユーティリティ。

  • mllib.recommendation−協調フィルタリングは、レコメンダーシステムで一般的に使用されます。これらの手法は、ユーザーアイテムの関連付けマトリックスの欠落しているエントリを埋めることを目的としています。

  • spark.mllib−現在、モデルベースの協調フィルタリングをサポートしています。この場合、ユーザーと製品は、欠落しているエントリを予測するために使用できる潜在的な要因の小さなセットによって記述されます。spark.mllibは、Alternating Least Squares(ALS)アルゴリズムを使用して、これらの潜在的要因を学習します。

  • mllib.regression−線形回帰は回帰アルゴリズムのファミリーに属しています。回帰の目的は、変数間の関係と依存関係を見つけることです。線形回帰モデルとモデルの要約を操作するためのインターフェースは、ロジスティック回帰の場合と似ています。

mllibパッケージの一部として、他のアルゴリズム、クラス、および関数もあります。今のところ、デモを理解しましょうpyspark.mllib

次の例は、ALSアルゴリズムを使用して推奨モデルを構築し、トレーニングデータで評価する協調フィルタリングの例です。

Dataset used − test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command −コマンドは次のようになります−

$SPARK_HOME/bin/spark-submit recommend.py

Output −上記のコマンドの出力は次のようになります−

Mean Squared Error = 1.20536041839e-05

シリアル化は、ApacheSparkのパフォーマンスチューニングに使用されます。ネットワーク経由で送信されるか、ディスクに書き込まれるか、メモリに保持されるすべてのデータは、シリアル化する必要があります。シリアル化は、コストのかかる操作で重要な役割を果たします。

PySparkは、パフォーマンスチューニング用のカスタムシリアライザーをサポートしています。次の2つのシリアライザーはPySparkでサポートされています-

MarshalSerializer

PythonのMarshalSerializerを使用してオブジェクトをシリアル化します。このシリアライザーはPickleSerializerよりも高速ですが、サポートするデータ型は少なくなります。

class pyspark.MarshalSerializer

PickleSerializer

PythonのPickleSerializerを使用してオブジェクトをシリアル化します。このシリアライザーは、ほぼすべてのPythonオブジェクトをサポートしますが、より特殊なシリアライザーほど高速ではない場合があります。

class pyspark.PickleSerializer

PySparkのシリアル化の例を見てみましょう。ここでは、MarshalSerializerを使用してデータをシリアル化します。

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command −コマンドは次のとおりです−

$SPARK_HOME/bin/spark-submit serializing.py

Output −上記のコマンドの出力は−です。

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]