SparkSQL-DataFrames

DataFrameはデータの分散コレクションであり、名前付きの列に編成されています。概念的には、優れた最適化手法を備えたリレーショナルテーブルと同等です。

DataFrameは、Hiveテーブル、構造化データファイル、外部データベース、既存のRDDなどのさまざまなソースの配列から構築できます。このAPIは、最新のビッグデータおよびデータサイエンスアプリケーション向けに設計されました。DataFrame in R Programming そして Pandas in Python

DataFrameの機能

これがDataFrameのいくつかの特徴的な機能のセットです-

  • 単一ノードクラスターから大規模クラスターでキロバイトからペタバイトのサイズのデータ​​を処理する機能。

  • さまざまなデータ形式(Avro、csv、Elastic Search、Cassandra)とストレージシステム(HDFS、HIVEテーブル、mysqlなど)をサポートします。

  • Spark SQL Catalystオプティマイザー(ツリー変換フレームワーク)による最先端の最適化とコード生成。

  • Spark-Coreを介して、すべてのビッグデータツールおよびフレームワークと簡単に統合できます。

  • Python、Java、Scala、およびRプログラミング用のAPIを提供します。

SQLContext

SQLContextはクラスであり、SparkSQLの機能を初期化するために使用されます。SQLContextクラスオブジェクトを初期化するには、SparkContextクラスオブジェクト(sc)が必要です。

次のコマンドは、spark-shellを介してSparkContextを初期化するために使用されます。

$ spark-shell

デフォルトでは、SparkContextオブジェクトは次の名前で初期化されます sc スパークシェルが開始したとき。

次のコマンドを使用して、SQLContextを作成します。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

名前の付いたJSONファイルの従業員レコードの例を考えてみましょう employee.json。次のコマンドを使用して、DataFrame(df)を作成し、という名前のJSONドキュメントを読み取ります。employee.json 以下の内容で。

employee.json −このファイルを現在のディレクトリに配置します scala> ポインタがあります。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame操作

DataFrameは、構造化データ操作用のドメイン固有言語を提供します。ここでは、DataFrameを使用した構造化データ処理の基本的な例をいくつか紹介します。

以下の手順に従って、DataFrame操作を実行します-

JSONドキュメントを読む

まず、JSONドキュメントを読む必要があります。これに基づいて、(dfs)という名前のDataFrameを生成します。

次のコマンドを使用して、という名前のJSONドキュメントを読み取ります employee.json。データは、フィールド-id、name、およびageを持つテーブルとして表示されます。

scala> val dfs = sqlContext.read.json("employee.json")

Output −フィールド名はから自動的に取得されます employee.json

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

データを表示する

DataFrameのデータを表示する場合は、次のコマンドを使用します。

scala> dfs.show()

Output −従業員データを表形式で表示できます。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

printSchemaメソッドを使用する

DataFrameの構造(スキーマ)を確認する場合は、次のコマンドを使用します。

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

選択方法を使用する

次のコマンドを使用してフェッチします name-DataFrameの3つの列のうちの列。

scala> dfs.select("name").show()

Output −の値を確認できます name カラム。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

年齢フィルターを使用する

次のコマンドを使用して、23歳を超える(23歳を超える)従業員を検索します。

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

groupByメソッドを使用する

次のコマンドを使用して、同じ年齢の従業員の数をカウントします。

scala> dfs.groupBy("age").count().show()

Output −2人の従業員は23歳です。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

プログラムによるSQLクエリの実行

SQLContextを使用すると、アプリケーションはSQL関数の実行中にプログラムでSQLクエリを実行し、結果をDataFrameとして返します。

一般に、バックグラウンドで、SparkSQLは既存のRDDをDataFrameに変換するための2つの異なる方法をサポートします-

シニアいいえ 方法と説明
1 リフレクションを使用したスキーマの推測

このメソッドは、リフレクションを使用して、特定のタイプのオブジェクトを含むRDDのスキーマを生成します。

2 プログラムによるスキーマの指定

DataFrameを作成するための2番目の方法は、スキーマを構築して既存のRDDに適用できるようにするプログラムインターフェイスを使用することです。