ApacheSpark-コアプログラミング
SparkCoreはプロジェクト全体の基盤です。分散タスクのディスパッチ、スケジューリング、および基本的なI / O機能を提供します。Sparkは、マシン間でパーティション化されたデータの論理コレクションであるRDD(Resilient Distributed Datasets)と呼ばれる特殊な基本データ構造を使用します。RDDは2つの方法で作成できます。1つは外部ストレージシステムのデータセットを参照する方法で、もう1つは既存のRDDに変換(マップ、フィルター、レデューサー、結合など)を適用する方法です。
RDDの抽象化は、言語統合APIを介して公開されます。これにより、アプリケーションがRDDを操作する方法が、データのローカルコレクションを操作する方法と似ているため、プログラミングの複雑さが簡素化されます。
Spark Shell
Sparkは、インタラクティブなシェルを提供します。これは、データをインタラクティブに分析するための強力なツールです。ScalaまたはPython言語で利用できます。Sparkの主な抽象化は、Resilient Distributed Dataset(RDD)と呼ばれるアイテムの分散コレクションです。RDDは、Hadoop入力形式(HDFSファイルなど)から、または他のRDDを変換することによって作成できます。
SparkShellを開く
次のコマンドを使用してSparkシェルを開きます。
$ spark-shell
簡単なRDDを作成する
テキストファイルから簡単なRDDを作成しましょう。次のコマンドを使用して、単純なRDDを作成します。
scala> val inputfile = sc.textFile(“input.txt”)
上記のコマンドの出力は次のとおりです。
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDDAPIはいくつかを導入します Transformations そして少数 Actions RDDを操作します。
RDD変換
RDD変換は、新しいRDDへのポインターを返し、RDD間の依存関係を作成できるようにします。依存関係チェーン(依存関係の文字列)内の各RDDには、データを計算するための関数があり、親RDDへのポインター(依存関係)があります。
Sparkは怠惰なので、ジョブの作成と実行をトリガーする変換またはアクションを呼び出さない限り、何も実行されません。ワードカウントの例の次のスニペットを見てください。
したがって、RDD変換はデータのセットではなく、Sparkにデータの取得方法とその処理方法を指示するプログラムのステップ(唯一のステップである可能性があります)です。
以下に、RDD変換のリストを示します。
S.No | 変換と意味 |
---|---|
1 | map(func) ソースの各要素を関数に渡すことによって形成された、新しい分散データセットを返します func。 |
2 | filter(func) ソースの要素を選択して形成された新しいデータセットを返します。 func trueを返します。 |
3 | flatMap(func) mapに似ていますが、各入力アイテムを0個以上の出力アイテムにマップできます(したがって、funcは単一のアイテムではなくSeqを返す必要があります)。 |
4 | mapPartitions(func) マップに似ていますが、RDDの各パーティション(ブロック)で個別に実行されるため、 func タイプTのRDDで実行する場合は、タイプIterator <T>⇒Iterator<U>である必要があります。 |
5 | mapPartitionsWithIndex(func) マップパーティションに似ていますが、 func パーティションのインデックスを表す整数値で、 func タイプTのRDDで実行する場合は、タイプ(Int、Iterator <T>)⇒Iterator<U>である必要があります。 |
6 | sample(withReplacement, fraction, seed) サンプル fraction 与えられた乱数ジェネレータシードを使用した、置換の有無にかかわらず、データの。 |
7 | union(otherDataset) ソースデータセットと引数の要素の和集合を含む新しいデータセットを返します。 |
8 | intersection(otherDataset) ソースデータセットの要素と引数の共通部分を含む新しいRDDを返します。 |
9 | distinct([numTasks]) ソースデータセットの個別の要素を含む新しいデータセットを返します。 |
10 | groupByKey([numTasks]) (K、V)ペアのデータセットで呼び出されると、(K、Iterable <V>)ペアのデータセットを返します。 Note −各キーに対して集計(合計や平均など)を実行するためにグループ化する場合、reduceByKeyまたはaggregateByKeyを使用するとパフォーマンスが大幅に向上します。 |
11 | reduceByKey(func, [numTasks]) (K、V)ペアのデータセットで呼び出されると、(K、V)ペアのデータセットを返します。ここで、各キーの値は、指定されたreduce関数funcを使用して集計されます。これは、タイプ(V、V)⇒Vである必要があります。 。groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。 |
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) (K、V)ペアのデータセットで呼び出されると、(K、U)ペアのデータセットを返します。ここで、各キーの値は、指定された結合関数とニュートラルな「ゼロ」値を使用して集計されます。不要な割り当てを回避しながら、入力値タイプとは異なる集計値タイプを許可します。groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。 |
13 | sortByKey([ascending], [numTasks]) KがOrderedを実装する(K、V)ペアのデータセットで呼び出されると、ブール昇順引数で指定されているように、キーで昇順または降順でソートされた(K、V)ペアのデータセットを返します。 |
14 | join(otherDataset, [numTasks]) タイプ(K、V)および(K、W)のデータセットで呼び出されると、各キーの要素のすべてのペアを含む(K、(V、W))ペアのデータセットを返します。外部結合は、leftOuterJoin、rightOuterJoin、およびfullOuterJoinを介してサポートされます。 |
15 | cogroup(otherDataset, [numTasks]) タイプ(K、V)および(K、W)のデータセットで呼び出されると、(K、(Iterable <V>、Iterable <W>))タプルのデータセットを返します。この操作は、グループWithとも呼ばれます。 |
16 | cartesian(otherDataset) タイプTおよびUのデータセットで呼び出されると、(T、U)ペア(要素のすべてのペア)のデータセットを返します。 |
17 | pipe(command, [envVars]) Perlまたはbashスクリプトなどのシェルコマンドを使用して、RDDの各パーティションをパイプします。RDD要素はプロセスのstdinに書き込まれ、そのstdoutに出力された行は文字列のRDDとして返されます。 |
18 | coalesce(numPartitions) RDD内のパーティションの数をnumPartitionsに減らします。大規模なデータセットをフィルタリングした後、操作をより効率的に実行するのに役立ちます。 |
19 | repartition(numPartitions) RDD内のデータをランダムに再シャッフルして、作成するパーティションの数を増やしたり減らしたりして、パーティション間でバランスを取ります。これにより、常にネットワーク上のすべてのデータがシャッフルされます。 |
20 | repartitionAndSortWithinPartitions(partitioner) 指定されたパーティショナーに従ってRDDを再パーティション化し、結果の各パーティション内で、キーでレコードをソートします。これは、再パーティションを呼び出してから各パーティション内でソートするよりも効率的です。これは、ソートをシャッフル機構にプッシュダウンできるためです。 |
行動
次の表に、値を返すアクションのリストを示します。
S.No | アクションと意味 |
---|---|
1 | reduce(func) 関数を使用してデータセットの要素を集約します func(これは2つの引数を取り、1つを返します)。関数は、並列で正しく計算できるように、可換で結合的である必要があります。 |
2 | collect() データセットのすべての要素をドライバープログラムで配列として返します。これは通常、データの十分に小さいサブセットを返すフィルターまたはその他の操作の後に役立ちます。 |
3 | count() データセット内の要素の数を返します。 |
4 | first() データセットの最初の要素を返します(take(1)と同様)。 |
5 | take(n) 最初の配列を返します n データセットの要素。 |
6 | takeSample (withReplacement,num, [seed]) のランダムサンプルを含む配列を返します num 置換の有無にかかわらず、データセットの要素。オプションで、乱数ジェネレータシードを事前に指定します。 |
7 | takeOrdered(n, [ordering]) 最初を返します n 自然な順序またはカスタムコンパレータのいずれかを使用するRDDの要素。 |
8 | saveAsTextFile(path) データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のディレクトリにテキストファイル(またはテキストファイルのセット)として書き込みます。Sparkは、各要素でtoStringを呼び出して、ファイル内のテキスト行に変換します。 |
9 | saveAsSequenceFile(path) (Java and Scala) データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のパスにHadoopSequenceFileとして書き込みます。これは、Hadoopの書き込み可能インターフェースを実装するキーと値のペアのRDDで利用できます。Scalaでは、暗黙的に書き込み可能に変換可能な型でも使用できます(Sparkには、Int、Double、Stringなどの基本型の変換が含まれます)。 |
10 | saveAsObjectFile(path) (Java and Scala) Javaシリアル化を使用してデータセットの要素を単純な形式で書き込み、SparkContext.objectFile()を使用してロードできます。 |
11 | countByKey() タイプ(K、V)のRDDでのみ使用できます。(K、Int)ペアのハッシュマップと各キーの数を返します。 |
12 | foreach(func) 関数を実行します funcデータセットの各要素。これは通常、アキュムレータの更新や外部ストレージシステムとの相互作用などの副作用のために行われます。 Note− foreach()の外部でアキュムレータ以外の変数を変更すると、未定義の動作が発生する可能性があります。詳細については、クロージャについてを参照してください。 |
RDDを使用したプログラミング
例を使用して、RDDプログラミングでのいくつかのRDD変換とアクションの実装を見てみましょう。
例
単語カウントの例を考えてみましょう-ドキュメントに表示される各単語をカウントします。次のテキストを入力として考え、として保存されますinput.txt ホームディレクトリ内のファイル。
input.txt −入力ファイル。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
以下の手順に従って、所定の例を実行してください。
Spark-Shellを開く
次のコマンドを使用して、スパークシェルを開きます。通常、sparkはScalaを使用して構築されます。したがって、SparkプログラムはScala環境で実行されます。
$ spark-shell
Sparkシェルが正常に開くと、次の出力が表示されます。出力の最後の行を見てください。「scとして利用可能なSparkコンテキスト」は、Sparkコンテナが自動的に作成された名前のSparkコンテキストオブジェクトを意味しますsc。プログラムの最初のステップを開始する前に、SparkContextオブジェクトを作成する必要があります。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
RDDを作成する
まず、Spark-Scala APIを使用して入力ファイルを読み取り、RDDを作成する必要があります。
次のコマンドは、指定された場所からファイルを読み取るために使用されます。ここでは、inputfileという名前で新しいRDDが作成されます。textFile(“”)メソッドで引数として指定される文字列は、入力ファイル名の絶対パスです。ただし、ファイル名のみが指定されている場合は、入力ファイルが現在の場所にあることを意味します。
scala> val inputfile = sc.textFile("input.txt")
ワードカウント変換を実行する
私たちの目的は、ファイル内の単語を数えることです。各行を単語に分割するためのフラットマップを作成します(flatMap(line ⇒ line.split(“ ”))。
次に、各単語を値を持つキーとして読み取ります ‘1’ (<キー、値> = <単語、1>)マップ関数を使用(map(word ⇒ (word, 1))。
最後に、同様のキーの値を追加して、これらのキーを減らします(reduceByKey(_+_))。
次のコマンドは、ワードカウントロジックを実行するために使用されます。これを実行した後、これはアクションではなく、変換であるため、出力は見つかりません。新しいRDDを指定するか、指定されたデータをどう処理するかをスパークに指示します)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
現在のRDD
RDDの操作中に、現在のRDDについて知りたい場合は、次のコマンドを使用します。現在のRDDとそのデバッグ用の依存関係についての説明が表示されます。
scala> counts.toDebugString
変換のキャッシュ
RDDのpersist()またはcache()メソッドを使用して、RDDを永続化するようにマークできます。アクションで初めて計算されるときは、ノードのメモリに保持されます。次のコマンドを使用して、中間変換をメモリに保存します。
scala> counts.cache()
アクションの適用
すべての変換を保存するなどのアクションを適用すると、結果がテキストファイルになります。saveAsTextFile(“”)メソッドのString引数は、出力フォルダーの絶対パスです。次のコマンドを試して、出力をテキストファイルに保存します。次の例では、「output」フォルダーが現在の場所にあります。
scala> counts.saveAsTextFile("output")
出力の確認
別の端末を開いてホームディレクトリに移動します(他の端末でsparkが実行されます)。出力ディレクトリを確認するには、次のコマンドを使用します。
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
次のコマンドを使用して、からの出力を確認します。 Part-00000 ファイル。
[hadoop@localhost output]$ cat part-00000
出力
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
次のコマンドを使用して、からの出力を確認します。 Part-00001 ファイル。
[hadoop@localhost output]$ cat part-00001
出力
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
UN Persist the Storage
UN-persistingの前に、このアプリケーションに使用されているストレージスペースを確認する場合は、ブラウザで次のURLを使用してください。
http://localhost:4040
次の画面が表示されます。これは、Sparkシェルで実行されているアプリケーションに使用されるストレージスペースを示しています。
特定のRDDのストレージスペースをUN-persistingする場合は、次のコマンドを使用します。
Scala> counts.unpersist()
次のような出力が表示されます-
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
ブラウザのストレージ容量を確認するには、次のURLを使用してください。
http://localhost:4040/
次の画面が表示されます。これは、Sparkシェルで実行されているアプリケーションに使用されるストレージスペースを示しています。