ApacheSpark-クイックガイド

業界は、データセットを分析するためにHadoopを幅広く使用しています。その理由は、Hadoopフレームワークが単純なプログラミングモデル(MapReduce)に基づいており、スケーラブルで柔軟性があり、フォールトトレラントで費用効果の高いコンピューティングソリューションを可能にするためです。ここでの主な関心事は、クエリ間の待機時間とプログラムの実行待機時間の観点から、大規模なデータセットの処理速度を維持することです。

Sparkは、Hadoop計算コンピューティングソフトウェアプロセスを高速化するためにApache SoftwareFoundationによって導入されました。

一般的な信念に反して、 Spark is not a modified version of Hadoop独自のクラスター管理があるため、実際にはHadoopに依存していません。Hadoopは、Sparkを実装する方法の1つにすぎません。

SparkはHadoopを2つの方法で使用します–1つは storage そして2番目は processing。Sparkには独自のクラスター管理計算があるため、Hadoopはストレージ目的でのみ使用されます。

Apache Spark

Apache Sparkは、高速計算用に設計された超高速クラスターコンピューティングテクノロジーです。これはHadoopMapReduceに基づいており、MapReduceモデルを拡張して、インタラクティブなクエリやストリーム処理など、より多くの種類の計算に効率的に使用できるようにします。Sparkの主な機能はin-memory cluster computing これにより、アプリケーションの処理速度が向上します。

Sparkは、バッチアプリケーション、反復アルゴリズム、インタラクティブクエリ、ストリーミングなど、幅広いワークロードをカバーするように設計されています。それぞれのシステムでこれらすべてのワークロードをサポートするだけでなく、個別のツールを維持する管理上の負担を軽減します。

ApacheSparkの進化

Sparkは、2009年にカリフォルニア大学バークレー校のAMPLabでMateiZahariaによって開発されたHadoopのサブプロジェクトの1つです。2010年にBSDライセンスの下でオープンソース化されました。2013年にApacheソフトウェア財団に寄付され、現在、ApacheSparkは2014年2月からトップレベルのApacheプロジェクトになりました。

ApacheSparkの機能

ApacheSparkには次の機能があります。

  • Speed− Sparkは、Hadoopクラスターでアプリケーションを実行するのに役立ちます。メモリでは最大100倍、ディスクで実行すると10倍高速になります。これは、ディスクへの読み取り/書き込み操作の数を減らすことで可能になります。中間処理データをメモリに保存します。

  • Supports multiple languages− Sparkは、Java、Scala、またはPythonの組み込みAPIを提供します。したがって、さまざまな言語でアプリケーションを作成できます。Sparkには、インタラクティブクエリ用の80個の高レベル演算子が用意されています。

  • Advanced Analytics− Sparkは、「Map」と「reduce」をサポートするだけではありません。また、SQLクエリ、ストリーミングデータ、機械学習(ML)、グラフアルゴリズムもサポートしています。

SparkはHadoop上に構築されています

次の図は、Hadoopコンポーネントを使用してSparkを構築する3つの方法を示しています。

以下で説明するように、Sparkのデプロイには3つの方法があります。

  • Standalone− Sparkスタンドアロン展開とは、SparkがHDFS(Hadoop分散ファイルシステム)の最上位を占め、HDFSに明示的にスペースが割り当てられることを意味します。ここでは、SparkとMapReduceが並行して実行され、クラスター上のすべてのSparkジョブをカバーします。

  • Hadoop Yarn− Hadoop Yarnのデプロイとは、単純に、事前インストールやルートアクセスを必要とせずにYarnでSparkが実行されることを意味します。SparkをHadoopエコシステムまたはHadoopスタックに統合するのに役立ちます。これにより、他のコンポーネントをスタック上で実行できます。

  • Spark in MapReduce (SIMR)− MapReduceのSparkは、スタンドアロン展開に加えて、sparkジョブを起動するために使用されます。SIMRを使用すると、ユーザーはSparkを起動し、管理アクセスなしでそのシェルを使用できます。

Sparkのコンポーネント

次の図は、Sparkのさまざまなコンポーネントを示しています。

Apache Spark Core

Spark Coreは、他のすべての機能が構築されているSparkプラットフォームの基盤となる一般的な実行エンジンです。インメモリコンピューティングと外部ストレージシステムのデータセットの参照を提供します。

Spark SQL

Spark SQLは、SchemaRDDと呼ばれる新しいデータ抽象化を導入するSpark Core上のコンポーネントであり、構造化データと半構造化データのサポートを提供します。

Sparkストリーミング

Spark Streamingは、SparkCoreの高速スケジューリング機能を利用してストリーミング分析を実行します。ミニバッチにデータを取り込み、それらのデータのミニバッチに対してRDD(Resilient Distributed Datasets)変換を実行します。

MLlib(機械学習ライブラリ)

MLlibは、分散メモリベースのSparkアーキテクチャにより、Sparkより上の分散機械学習フレームワークです。ベンチマークによると、これはMLlib開発者によってAlternating Least Squares(ALS)の実装に対して行われます。Spark MLlibは、Hadoopディスクベースバージョンの9倍の速度です。Apache Mahout (MahoutがSparkインターフェイスを取得する前)。

GraphX

GraphXは、Spark上にある分散グラフ処理フレームワークです。Pregel抽象化APIを使用してユーザー定義のグラフをモデル化できるグラフ計算を表現するためのAPIを提供します。また、この抽象化のために最適化されたランタイムを提供します。

復元力のある分散データセット

復元力のある分散データセット(RDD)は、Sparkの基本的なデータ構造です。これは、オブジェクトの不変の分散コレクションです。RDDの各データセットは論理パーティションに分割され、クラスターのさまざまなノードで計算できます。RDDには、ユーザー定義クラスを含む、任意のタイプのPython、Java、またはScalaオブジェクトを含めることができます。

正式には、RDDは読み取り専用のパーティション化されたレコードのコレクションです。RDDは、安定したストレージ上のデータまたは他のRDDのいずれかに対する決定論的操作によって作成できます。RDDは、並行して操作できるフォールトトレラントな要素のコレクションです。

RDDを作成する方法は2つあります- parallelizing ドライバープログラムの既存のコレクション、または referencing a dataset 共有ファイルシステム、HDFS、HBase、またはHadoop入力形式を提供する任意のデータソースなどの外部ストレージシステム内。

Sparkは、RDDの概念を利用して、より高速で効率的なMapReduce操作を実現します。まず、MapReduce操作がどのように行われるのか、そしてなぜそれらがそれほど効率的でないのかについて説明しましょう。

MapReduceでのデータ共有が遅い

MapReduceは、クラスター上で並列分散アルゴリズムを使用して大規模なデータセットを処理および生成するために広く採用されています。これにより、ユーザーは、作業の分散やフォールトトレランスを気にすることなく、一連の高レベルの演算子を使用して並列計算を記述できます。

残念ながら、現在のほとんどのフレームワークでは、計算間(Ex − 2つのMapReduceジョブ間)でデータを再利用する唯一の方法は、外部の安定したストレージシステム(Ex − HDFS)にデータを書き込むことです。このフレームワークは、クラスターの計算リソースにアクセスするための多数の抽象化を提供しますが、ユーザーはさらに多くのことを望んでいます。

どちらも Iterative そして Interactiveアプリケーションでは、並列ジョブ間でのより高速なデータ共有が必要です。MapReduceでのデータ共有は、replication, serialization、および disk IO。ストレージシステムに関しては、ほとんどのHadoopアプリケーションで、90%以上の時間をHDFSの読み取り/書き込み操作に費やしています。

MapReduceの反復操作

多段階アプリケーションで複数の計算にわたって中間結果を再利用します。次の図は、MapReduceで反復操作を実行しているときに、現在のフレームワークがどのように機能するかを説明しています。これにより、データレプリケーション、ディスクI / O、およびシリアル化によりかなりのオーバーヘッドが発生し、システムの速度が低下します。

MapReduceのインタラクティブ操作

ユーザーは、データの同じサブセットに対してアドホッククエリを実行します。各クエリは、安定したストレージでディスクI / Oを実行します。これは、アプリケーションの実行時間を支配する可能性があります。

次の図は、MapReduceでインタラクティブクエリを実行しているときに現在のフレームワークがどのように機能するかを説明しています。

SparkRDDを使用したデータ共有

MapReduceでのデータ共有は、 replication, serialization、および disk IO。ほとんどのHadoopアプリケーションは、90%以上の時間をHDFSの読み取り/書き込み操作に費やしています。

この問題を認識して、研究者はApacheSparkと呼ばれる特殊なフレームワークを開発しました。スパークの重要なアイデアはR弾力性 D配布 Dアタセット(RDD); インメモリ処理の計算をサポートします。つまり、メモリの状態をジョブ間でオブジェクトとして保存し、オブジェクトはそれらのジョブ間で共有可能です。メモリ内のデータ共有は、ネットワークやディスクよりも10倍から100倍高速です。

ここで、SparkRDDで反復的でインタラクティブな操作がどのように行われるかを調べてみましょう。

SparkRDDでの反復操作

以下の図は、SparkRDDでの反復操作を示しています。中間結果を安定ストレージ(ディスク)ではなく分散メモリに保存し、システムを高速化します。

Note −分散メモリ(RAM)が中間結果(JOBの状態)を保存するのに十分である場合、それらの結果をディスクに保存します。

SparkRDDでのインタラクティブな操作

この図は、SparkRDDでのインタラクティブな操作を示しています。同じデータセットに対して異なるクエリが繰り返し実行される場合、この特定のデータをメモリに保持して、実行時間を短縮できます。

デフォルトでは、変換された各RDDは、アクションを実行するたびに再計算される場合があります。ただし、persistメモリ内のRDD。この場合、Sparkは、次回クエリを実行したときに、要素をクラスター上に保持して、はるかに高速なアクセスを実現します。ディスク上にRDDを永続化すること、または複数のノード間で複製することもサポートされています。

SparkはHadoopのサブプロジェクトです。したがって、SparkをLinuxベースのシステムにインストールすることをお勧めします。次の手順は、ApacheSparkをインストールする方法を示しています。

ステップ1:Javaインストールの確認

Javaのインストールは、Sparkをインストールする際に必須のことの1つです。次のコマンドを試して、JAVAのバージョンを確認してください。

$java -version

Javaがすでにシステムにインストールされている場合は、次の応答が表示されます。

java version "1.7.0_71" 
Java(TM) SE Runtime Environment (build 1.7.0_71-b13) 
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

システムにJavaがインストールされていない場合は、次の手順に進む前にJavaをインストールしてください。

ステップ2:Scalaのインストールを確認する

Sparkを実装するにはScala言語が必要です。それでは、次のコマンドを使用してScalaのインストールを確認しましょう。

$scala -version

Scalaがすでにシステムにインストールされている場合は、次の応答が表示されます-

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

システムにScalaがインストールされていない場合は、Scalaのインストールの次のステップに進みます。

ステップ3:Scalaをダウンロードする

次のリンクにアクセスして、Scalaの最新バージョンをダウンロードしてください。Scalaをダウンロードしてください。このチュートリアルでは、scala-2.11.6バージョンを使用しています。ダウンロード後、ダウンロードフォルダにScalatarファイルがあります。

ステップ4:Scalaをインストールする

Scalaをインストールするには、以下の手順に従ってください。

Scalatarファイルを抽出します

Scala tarファイルを抽出するには、次のコマンドを入力します。

$ tar xvf scala-2.11.6.tgz

Scalaソフトウェアファイルを移動する

Scalaソフトウェアファイルをそれぞれのディレクトリに移動するには、次のコマンドを使用します (/usr/local/scala)

$ su – 
Password: 
# cd /home/Hadoop/Downloads/ 
# mv scala-2.11.6 /usr/local/scala 
# exit

ScalaのPATHを設定する

ScalaのPATHを設定するには、次のコマンドを使用します。

$ export PATH = $PATH:/usr/local/scala/bin

Scalaのインストールの確認

インストール後、確認することをお勧めします。Scalaのインストールを確認するには、次のコマンドを使用します。

$scala -version

Scalaがすでにシステムにインストールされている場合は、次の応答が表示されます-

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

ステップ5:ApacheSparkをダウンロードする

次のリンクにアクセスして、Sparkの最新バージョンをダウンロードします。Sparkのダウンロード。このチュートリアルでは、spark-1.3.1-bin-hadoop2.6バージョン。ダウンロード後、ダウンロードフォルダにSparktarファイルがあります。

ステップ6:Sparkをインストールする

Sparkをインストールするには、以下の手順に従ってください。

スパークタールの抽出

Sparktarファイルを抽出するための次のコマンド。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Sparkソフトウェアファイルの移動

Sparkソフトウェアファイルをそれぞれのディレクトリに移動するための次のコマンド (/usr/local/spark)

$ su – 
Password:  

# cd /home/Hadoop/Downloads/ 
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark 
# exit

Sparkの環境をセットアップする

〜に次の行を追加します/.bashrcファイル。これは、sparkソフトウェアファイルが配置されている場所をPATH変数に追加することを意味します。

export PATH=$PATH:/usr/local/spark/bin

〜/ .bashrcファイルを取得するには、次のコマンドを使用します。

$ source ~/.bashrc

ステップ7:Sparkのインストールを確認する

Sparkシェルを開くには次のコマンドを記述します。

$spark-shell

Sparkが正常にインストールされると、次の出力が表示されます。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc  
scala>

SparkCoreはプロジェクト全体の基盤です。分散タスクのディスパッチ、スケジューリング、および基本的なI / O機能を提供します。Sparkは、マシン間でパーティション化されたデータの論理コレクションであるRDD(Resilient Distributed Datasets)と呼ばれる特殊な基本データ構造を使用します。RDDは2つの方法で作成できます。1つは外部ストレージシステムのデータセットを参照する方法で、もう1つは既存のRDDに変換(マップ、フィルター、レデューサー、結合など)を適用する方法です。

RDDの抽象化は、言語統合APIを介して公開されます。これにより、アプリケーションがRDDを操作する方法が、データのローカルコレクションを操作する方法と似ているため、プログラミングの複雑さが簡素化されます。

Spark Shell

Sparkは、インタラクティブなシェルを提供します。これは、データをインタラクティブに分析するための強力なツールです。ScalaまたはPython言語で利用できます。Sparkの主な抽象化は、Resilient Distributed Dataset(RDD)と呼ばれるアイテムの分散コレクションです。RDDは、Hadoop入力形式(HDFSファイルなど)から、または他のRDDを変換することによって作成できます。

SparkShellを開く

次のコマンドを使用してSparkシェルを開きます。

$ spark-shell

簡単なRDDを作成する

テキストファイルから簡単なRDDを作成しましょう。次のコマンドを使用して、単純なRDDを作成します。

scala> val inputfile = sc.textFile(“input.txt”)

上記のコマンドの出力は次のとおりです。

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDDAPIはいくつかを導入します Transformations そして少数 Actions RDDを操作します。

RDD変換

RDD変換は、新しいRDDへのポインターを返し、RDD間の依存関係を作成できるようにします。依存関係チェーン(依存関係の文字列)内の各RDDには、データを計算するための関数があり、親RDDへのポインター(依存関係)があります。

Sparkは怠惰なので、ジョブの作成と実行をトリガーする変換またはアクションを呼び出さない限り、何も実行されません。ワードカウントの例の次のスニペットを見てください。

したがって、RDD変換はデータのセットではなく、Sparkにデータの取得方法とその処理方法を指示するプログラムのステップ(唯一のステップである可能性があります)です。

S.No 変換と意味
1

map(func)

ソースの各要素を関数に渡すことによって形成された、新しい分散データセットを返します func

2

filter(func)

ソースの要素を選択して形成された新しいデータセットを返します。 func trueを返します。

3

flatMap(func)

mapに似ていますが、各入力アイテムを0個以上の出力アイテムにマップできます(したがって、funcは単一のアイテムではなくSeqを返す必要があります)。

4

mapPartitions(func)

マップに似ていますが、RDDの各パーティション(ブロック)で個別に実行されるため、 func タイプTのRDDで実行する場合は、タイプIterator <T>⇒Iterator<U>である必要があります。

5

mapPartitionsWithIndex(func)

マップパーティションに似ていますが、 func パーティションのインデックスを表す整数値で、 func タイプTのRDDで実行する場合は、タイプ(Int、Iterator <T>)⇒Iterator<U>である必要があります。

6

sample(withReplacement, fraction, seed)

サンプル fraction 与えられた乱数ジェネレータシードを使用した、置換の有無にかかわらず、データの。

7

union(otherDataset)

ソースデータセットと引数の要素の和集合を含む新しいデータセットを返します。

8

intersection(otherDataset)

ソースデータセットの要素と引数の共通部分を含む新しいRDDを返します。

9

distinct([numTasks])

ソースデータセットの個別の要素を含む新しいデータセットを返します。

10

groupByKey([numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、Iterable <V>)ペアのデータセットを返します。

Note −各キーに対して集計(合計や平均など)を実行するためにグループ化する場合、reduceByKeyまたはaggregateByKeyを使用するとパフォーマンスが大幅に向上します。

11

reduceByKey(func, [numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、V)ペアのデータセットを返します。ここで、各キーの値は、指定されたreduce関数funcを使用して集計されます。これは、タイプ(V、V)⇒Vである必要があります。 。groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

(K、V)ペアのデータセットで呼び出されると、(K、U)ペアのデータセットを返します。ここで、各キーの値は、指定された結合関数とニュートラルな「ゼロ」値を使用して集計されます。不要な割り当てを回避しながら、入力値タイプとは異なる集計値タイプを許可します。groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数を使用して構成できます。

13

sortByKey([ascending], [numTasks])

KがOrderedを実装する(K、V)ペアのデータセットで呼び出されると、ブール昇順引数で指定されているように、キーで昇順または降順でソートされた(K、V)ペアのデータセットを返します。

14

join(otherDataset, [numTasks])

タイプ(K、V)および(K、W)のデータセットで呼び出されると、各キーの要素のすべてのペアを含む(K、(V、W))ペアのデータセットを返します。外部結合は、leftOuterJoin、rightOuterJoin、およびfullOuterJoinを介してサポートされます。

15

cogroup(otherDataset, [numTasks])

タイプ(K、V)および(K、W)のデータセットで呼び出されると、(K、(Iterable <V>、Iterable <W>))タプルのデータセットを返します。この操作は、グループWithとも呼ばれます。

16

cartesian(otherDataset)

タイプTおよびUのデータセットで呼び出されると、(T、U)ペア(要素のすべてのペア)のデータセットを返します。

17

pipe(command, [envVars])

Perlまたはbashスクリプトなどのシェルコマンドを使用して、RDDの各パーティションをパイプします。RDD要素はプロセスのstdinに書き込まれ、そのstdoutに出力された行は文字列のRDDとして返されます。

18

coalesce(numPartitions)

RDD内のパーティションの数をnumPartitionsに減らします。大規模なデータセットをフィルタリングした後、操作をより効率的に実行するのに役立ちます。

19

repartition(numPartitions)

RDD内のデータをランダムに再シャッフルして、作成するパーティションの数を増やしたり減らしたりして、パーティション間でバランスを取ります。これにより、常にネットワーク上のすべてのデータがシャッフルされます。

20

repartitionAndSortWithinPartitions(partitioner)

指定されたパーティショナーに従ってRDDを再パーティション化し、結果の各パーティション内で、キーでレコードをソートします。これは、再パーティションを呼び出してから各パーティション内でソートするよりも効率的です。これは、ソートをシャッフル機構にプッシュダウンできるためです。

行動

S.No アクションと意味
1

reduce(func)

関数を使用してデータセットの要素を集約します func(これは2つの引数を取り、1つを返します)。関数は、並列で正しく計算できるように、可換で結合的である必要があります。

2

collect()

データセットのすべての要素をドライバープログラムで配列として返します。これは通常、データの十分に小さいサブセットを返すフィルターまたはその他の操作の後に役立ちます。

3

count()

データセット内の要素の数を返します。

4

first()

データセットの最初の要素を返します(take(1)と同様)。

5

take(n)

最初の配列を返します n データセットの要素。

6

takeSample (withReplacement,num, [seed])

のランダムサンプルを含む配列を返します num 置換の有無にかかわらず、データセットの要素。オプションで、乱数ジェネレータシードを事前に指定します。

7

takeOrdered(n, [ordering])

最初を返します n 自然な順序またはカスタムコンパレータのいずれかを使用するRDDの要素。

8

saveAsTextFile(path)

データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のディレクトリにテキストファイル(またはテキストファイルのセット)として書き込みます。Sparkは、各要素でtoStringを呼び出して、ファイル内のテキスト行に変換します。

9

saveAsSequenceFile(path) (Java and Scala)

データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoopでサポートされているファイルシステムの特定のパスにHadoopSequenceFileとして書き込みます。これは、Hadoopの書き込み可能インターフェースを実装するキーと値のペアのRDDで利用できます。Scalaでは、暗黙的に書き込み可能に変換可能な型でも使用できます(Sparkには、Int、Double、Stringなどの基本型の変換が含まれます)。

10

saveAsObjectFile(path) (Java and Scala)

Javaシリアル化を使用してデータセットの要素を単純な形式で書き込み、SparkContext.objectFile()を使用してロードできます。

11

countByKey()

タイプ(K、V)のRDDでのみ使用できます。(K、Int)ペアのハッシュマップと各キーの数を返します。

12

foreach(func)

関数を実行します funcデータセットの各要素。これは通常、アキュムレータの更新や外部ストレージシステムとの相互作用などの副作用のために行われます。

Note− foreach()の外部でアキュムレータ以外の変数を変更すると、未定義の動作が発生する可能性があります。詳細については、クロージャについてを参照してください。

RDDを使用したプログラミング

例を使用して、RDDプログラミングでのいくつかのRDD変換とアクションの実装を見てみましょう。

単語カウントの例を考えてみましょう-ドキュメントに表示される各単語をカウントします。次のテキストを入力として考え、として保存されますinput.txt ホームディレクトリ内のファイル。

input.txt −入力ファイル。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

以下の手順に従って、所定の例を実行してください。

Spark-Shellを開く

次のコマンドを使用して、スパークシェルを開きます。通常、sparkはScalaを使用して構築されます。したがって、SparkプログラムはScala環境で実行されます。

$ spark-shell

Sparkシェルが正常に開くと、次の出力が表示されます。出力の最後の行を見てください。「scとして利用可能なSparkコンテキスト」は、Sparkコンテナが自動的に作成された名前のSparkコンテキストオブジェクトを意味します。sc。プログラムの最初のステップを開始する前に、SparkContextオブジェクトを作成する必要があります。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

RDDを作成する

まず、Spark-Scala APIを使用して入力ファイルを読み取り、RDDを作成する必要があります。

次のコマンドは、指定された場所からファイルを読み取るために使用されます。ここでは、inputfileという名前で新しいRDDが作成されます。textFile(“”)メソッドで引数として指定される文字列は、入力ファイル名の絶対パスです。ただし、ファイル名のみが指定されている場合は、入力ファイルが現在の場所にあることを意味します。

scala> val inputfile = sc.textFile("input.txt")

ワードカウント変換を実行する

私たちの目的は、ファイル内の単語を数えることです。各行を単語に分割するためのフラットマップを作成します(flatMap(line ⇒ line.split(“ ”))。

次に、各単語を値を持つキーとして読み取ります ‘1’ (<キー、値> = <単語、1>)マップ関数を使用(map(word ⇒ (word, 1))。

最後に、同様のキーの値を追加して、これらのキーを減らします(reduceByKey(_+_))。

次のコマンドは、ワードカウントロジックを実行するために使用されます。これを実行した後、これはアクションではなく、変換であるため、出力は見つかりません。新しいRDDを指定するか、指定されたデータをどう処理するかをスパークに指示します)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

現在のRDD

RDDの操作中に、現在のRDDについて知りたい場合は、次のコマンドを使用します。現在のRDDとそのデバッグ用の依存関係についての説明が表示されます。

scala> counts.toDebugString

変換のキャッシュ

RDDのpersist()またはcache()メソッドを使用して、RDDを永続化するようにマークできます。アクションで初めて計算されるときは、ノードのメモリに保持されます。次のコマンドを使用して、中間変換をメモリに保存します。

scala> counts.cache()

アクションの適用

すべての変換を保存するなどのアクションを適用すると、結果がテキストファイルになります。saveAsTextFile(“”)メソッドのString引数は、出力フォルダーの絶対パスです。次のコマンドを試して、出力をテキストファイルに保存します。次の例では、「output」フォルダーが現在の場所にあります。

scala> counts.saveAsTextFile("output")

出力の確認

別の端末を開いてホームディレクトリに移動します(他の端末でsparkが実行されます)。出力ディレクトリを確認するには、次のコマンドを使用します。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

次のコマンドを使用して、からの出力を確認します。 Part-00000 ファイル。

[hadoop@localhost output]$ cat part-00000

出力

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

次のコマンドを使用して、からの出力を確認します。 Part-00001 ファイル。

[hadoop@localhost output]$ cat part-00001

出力

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

UN Persist the Storage

UN-persistingの前に、このアプリケーションに使用されているストレージスペースを確認する場合は、ブラウザで次のURLを使用してください。

http://localhost:4040

次の画面が表示されます。これは、Sparkシェルで実行されているアプリケーションに使用されるストレージスペースを示しています。

特定のRDDのストレージスペースをUN-persistingする場合は、次のコマンドを使用します。

Scala> counts.unpersist()

次のような出力が表示されます-

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

ブラウザのストレージ容量を確認するには、次のURLを使用してください。

http://localhost:4040/

次の画面が表示されます。これは、Sparkシェルで実行されているアプリケーションに使用されるストレージスペースを示しています。

Spark-submitを使用するSparkアプリケーションは、Sparkアプリケーションをクラスターにデプロイするために使用されるシェルコマンドです。統一されたインターフェイスを介して、それぞれのクラスターマネージャーをすべて使用します。したがって、それぞれにアプリケーションを構成する必要はありません。

シェルコマンドを使用して、以前に使用したのと同じ単語数の例を見てみましょう。ここでは、sparkアプリケーションと同じ例を検討します。

サンプル入力

次のテキストは入力データであり、という名前のファイルは in.txt

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

次のプログラムを見てください-

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

上記のプログラムをという名前のファイルに保存します SparkWordCount.scala そしてそれをという名前のユーザー定義ディレクトリに置きます spark-application

Note − inputRDDをcountRDDに変換する際、(テキストファイルからの)行を単語にトークン化するためのflatMap()、単語の頻度をカウントするためのmap()メソッド、および各単語の繰り返しをカウントするためのreduceByKey()メソッドを使用しています。

この申請書を提出するには、次の手順を使用してください。のすべてのステップを実行しますspark-application ターミナルを介してディレクトリ。

ステップ1:SparkJaをダウンロードする

コンパイルにはSparkコアjarが必要です。したがって、次のリンクからspark-core_2.10-1.3.0.jarをダウンロードします。Sparkコアjarをダウンロードディレクトリから次の場所に移動します。spark-application ディレクトリ。

ステップ2:プログラムをコンパイルする

以下のコマンドを使用して、上記のプログラムをコンパイルします。このコマンドは、spark-applicationディレクトリから実行する必要があります。ここに、/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar Sparkライブラリから取得したHadoopサポートjarです。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

ステップ3:JARを作成する

次のコマンドを使用して、sparkアプリケーションのjarファイルを作成します。ここに、wordcount jarファイルのファイル名です。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

ステップ4:Sparkアプリケーションを送信する

次のコマンドを使用してsparkアプリケーションを送信します-

spark-submit --class SparkWordCount --master local wordcount.jar

正常に実行されると、以下の出力が表示されます。ザ・OK次の出力を入力するのはユーザー識別用であり、それがプログラムの最後の行です。次の出力を注意深く読むと、次のようなさまざまなことがわかります。

  • ポート42954でサービス「sparkDriver」を正常に開始しました
  • MemoryStoreは容量267.3MBで開始しました
  • http://192.168.1.217:4040でSparkUIを開始しました
  • 追加されたJARファイル:/home/hadoop/piapplication/count.jar
  • ResultStage 1(SparkPi.scala:11のsaveAsTextFile)は0.566秒で終了しました
  • http://192.168.1.217:4040でSparkWebUIを停止しました
  • MemoryStoreがクリアされました
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

ステップ5:出力を確認する

プログラムが正常に実行されると、次の名前のディレクトリが見つかります。 outfile spark-applicationディレクトリにあります。

次のコマンドは、outfileディレクトリ内のファイルのリストを開いて確認するために使用されます。

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

出力をチェックするためのコマンド part-00000 ファイルは−

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

part-00001ファイルの出力を確認するためのコマンドは次のとおりです。

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

'spark-submit'コマンドの詳細については、次のセクションを参照してください。

Spark-submit構文

spark-submit [options] <app jar | python file> [app arguments]

オプション

S.No オプション 説明
1 - 主人 spark:// host:port、mesos:// host:port、yarn、またはlocal。
2 --deploy-mode ドライバープログラムをローカルで起動するか(「クライアント」)、クラスター内のワーカーマシンの1つで起動するか(「クラスター」)(デフォルト:クライアント)。
3 - クラス アプリケーションのメインクラス(Java / Scalaアプリの場合)。
4 - 名前 アプリケーションの名前。
5 --jars ドライバとエグゼキュータのクラスパスに含めるローカルjarのコンマ区切りのリスト。
6 -パッケージ ドライバーとエグゼキューターのクラスパスに含めるjarのMaven座標のコンマ区切りリスト。
7 -リポジトリ --packagesで指定されたMaven座標を検索するための追加のリモートリポジトリのコンマ区切りリスト。
8 --py-files PythonアプリのPYTHONPATHに配置する.zip、.egg、または.pyファイルのコンマ区切りのリスト。
9 -ファイル 各エグゼキュータの作業ディレクトリに配置されるファイルのコンマ区切りのリスト。
10 --conf(prop = val) 任意のSpark構成プロパティ。
11 --properties-file 追加のプロパティをロードするファイルへのパス。指定されていない場合、これはconf / spark-defaultsを検索します。
12 --driver-memory ドライバ用のメモリ(例:1000M、2G)(デフォルト:512M)。
13 --driver-java-options ドライバーに渡す追加のJavaオプション。
14 --driver-library-path ドライバに渡す追加のライブラリパスエントリ。
15 --driver-class-path

ドライバーに渡す追加のクラスパスエントリ。

--jarsで追加されたjarは、クラスパスに自動的に含まれることに注意してください。

16 --executor-memory エグゼキュータあたりのメモリ(例:1000M、2G)(デフォルト:1G)。
17 --proxy-user アプリケーションを送信するときに偽装するユーザー。
18 -ヘルプ、-h このヘルプメッセージを表示して終了します。
19 --verbose、-v 追加のデバッグ出力を出力します。
20 - バージョン 現在のSparkのバージョンを印刷します。
21 --driver-cores NUM ドライバーのコア(デフォルト:1)。
22 -監督する 指定されている場合、失敗時にドライバーを再起動します。
23 - 殺します 指定された場合、指定されたドライバーを強制終了します。
24 - 状態 指定された場合、指定されたドライバーのステータスを要求します。
25 --total-executor-cores すべてのエグゼキュータの合計コア。
26 --executor-cores エグゼキュータあたりのコア数。(デフォルト:YARNモードでは1、またはスタンドアロンモードではワーカーで使用可能なすべてのコア)。

Sparkには2つの異なるタイプの共有変数が含まれています-1つは broadcast variables そして2番目は accumulators

  • Broadcast variables −大きな値を効率的に分散するために使用されます。

  • Accumulators −特定のコレクションの情報を集約するために使用されます。

ブロードキャスト変数

ブロードキャスト変数を使用すると、プログラマーは、タスクと一緒にそのコピーを出荷するのではなく、読み取り専用変数を各マシンにキャッシュしておくことができます。これらは、たとえば、すべてのノードに大きな入力データセットのコピーを効率的に提供するために使用できます。Sparkはまた、通信コストを削減するために、効率的なブロードキャストアルゴリズムを使用してブロードキャスト変数を配布しようとします。

Sparkアクションは、分散された「シャッフル」操作によって分離された一連のステージを通じて実行されます。Sparkは、各ステージ内のタスクに必要な共通データを自動的にブロードキャストします。

この方法でブロードキャストされたデータは、シリアル化された形式でキャッシュされ、各タスクを実行する前に逆シリアル化されます。つまり、ブロードキャスト変数を明示的に作成することは、複数のステージにまたがるタスクが同じデータを必要とする場合、またはデータを逆シリアル化された形式でキャッシュすることが重要な場合にのみ役立ちます。

ブロードキャスト変数は変数から作成されます v 電話で SparkContext.broadcast(v)。ブロードキャスト変数はラッパーですv、およびその値には、を呼び出すことでアクセスできます。 value方法。以下のコードはこれを示しています-

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

ブロードキャスト変数を作成したら、値の代わりに使用する必要があります v クラスタで実行されるすべての関数で、 vノードに複数回出荷されることはありません。さらに、オブジェクトv すべてのノードがブロードキャスト変数の同じ値を取得することを保証するために、ブロードキャスト後に変更しないでください。

アキュムレータ

アキュムレータは、結合演算によってのみ「追加」される変数であるため、並列で効率的にサポートできます。これらは、カウンター(MapReduceのように)または合計を実装するために使用できます。Sparkは数値型のアキュムレータをネイティブにサポートしており、プログラマーは新しい型のサポートを追加できます。アキュムレータが名前で作成されている場合、それらはに表示されますSpark’s UI。これは、実行中のステージの進行状況を理解するのに役立ちます(注-これはPythonではまだサポートされていません)。

アキュムレータは初期値から作成されます v 電話で SparkContext.accumulator(v)。クラスターで実行されているタスクは、クラスターを使用してクラスターに追加できます。addメソッドまたは+ =演算子(ScalaおよびPythonの場合)。ただし、その値を読み取ることはできません。ドライバプログラムのみが、アキュムレータの値を使用してアキュムレータの値を読み取ることができます。value 方法。

以下のコードは、配列の要素を合計するために使用されているアキュムレータを示しています-

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

上記のコードの出力を確認したい場合は、次のコマンドを使用します-

scala> accum.value

出力

res2: Int = 10

数値RDD演算

Sparkでは、事前定義されたAPIメソッドの1つを使用して、数値データに対してさまざまな操作を実行できます。Sparkの数値演算は、一度に1つの要素でモデルを構築できるストリーミングアルゴリズムを使用して実装されます。

これらの演算は計算され、として返されます。 StatusCounter 呼び出してオブジェクト status() 方法。

S.No 方法と意味
1

count()

RDD内の要素の数。

2

Mean()

RDD内の要素の平均。

3

Sum()

RDD内の要素の合計値。

4

Max()

RDD内のすべての要素の最大値。

5

Min()

RDD内のすべての要素の最小値。

6

Variance()

要素の分散。

7

Stdev()

標準偏差。

これらのメソッドの1つだけを使用する場合は、対応するメソッドをRDDで直接呼び出すことができます。