Spark SQL - DataFrames

Ein DataFrame ist eine verteilte Sammlung von Daten, die in benannten Spalten organisiert ist. Konzeptionell entspricht es relationalen Tabellen mit guten Optimierungstechniken.

Ein DataFrame kann aus einer Reihe verschiedener Quellen wie Hive-Tabellen, strukturierten Datendateien, externen Datenbanken oder vorhandenen RDDs erstellt werden. Diese API wurde für moderne Big Data- und Data Science-Anwendungen entwickelt, die sich von diesen inspirieren lassenDataFrame in R Programming und Pandas in Python.

Funktionen von DataFrame

Hier sind einige charakteristische Merkmale von DataFrame:

  • Möglichkeit, die Daten in der Größe von Kilobyte bis Petabyte auf einem einzelnen Knotencluster zu einem großen Cluster zu verarbeiten.

  • Unterstützt verschiedene Datenformate (Avro, CSV, elastische Suche und Cassandra) und Speichersysteme (HDFS, HIVE-Tabellen, MySQL usw.).

  • Modernste Optimierung und Codegenerierung mit dem Spark SQL Catalyst-Optimierer (Tree Transformation Framework).

  • Kann über Spark-Core problemlos in alle Big Data-Tools und -Frameworks integriert werden.

  • Bietet API für Python-, Java-, Scala- und R-Programmierung.

SQLContext

SQLContext ist eine Klasse und wird zum Initialisieren der Funktionen von Spark SQL verwendet. Das SparkContext-Klassenobjekt (sc) wird zum Initialisieren des SQLContext-Klassenobjekts benötigt.

Der folgende Befehl wird zum Initialisieren des SparkContext über die Spark-Shell verwendet.

$ spark-shell

Standardmäßig wird das SparkContext-Objekt mit dem Namen initialisiert sc wenn die Funkenschale startet.

Verwenden Sie den folgenden Befehl, um SQLContext zu erstellen.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Beispiel

Betrachten wir ein Beispiel für Mitarbeiterdatensätze in einer JSON-Datei mit dem Namen employee.json. Verwenden Sie die folgenden Befehle, um einen DataFrame (df) zu erstellen und ein JSON-Dokument mit dem Namen zu lesenemployee.json mit folgendem Inhalt.

employee.json - Legen Sie diese Datei in dem Verzeichnis ab, in dem sich die aktuelle befindet scala> Zeiger befindet sich.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame-Operationen

DataFrame bietet eine domänenspezifische Sprache für die strukturierte Datenmanipulation. Hier finden Sie einige grundlegende Beispiele für die strukturierte Datenverarbeitung mit DataFrames.

Führen Sie die folgenden Schritte aus, um DataFrame-Vorgänge auszuführen.

Lesen Sie das JSON-Dokument

Zuerst müssen wir das JSON-Dokument lesen. Generieren Sie darauf basierend einen DataFrame mit dem Namen (dfs).

Verwenden Sie den folgenden Befehl, um das genannte JSON-Dokument zu lesen employee.json. Die Daten werden als Tabelle mit den Feldern ID, Name und Alter angezeigt.

scala> val dfs = sqlContext.read.json("employee.json")

Output - Die Feldnamen werden automatisch übernommen employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Zeigen Sie die Daten an

Wenn Sie die Daten im DataFrame anzeigen möchten, verwenden Sie den folgenden Befehl.

scala> dfs.show()

Output - Sie können die Mitarbeiterdaten in Tabellenform anzeigen.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Verwenden Sie die printSchema-Methode

Wenn Sie die Struktur (das Schema) des DataFrame anzeigen möchten, verwenden Sie den folgenden Befehl.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Verwenden Sie die Auswahlmethode

Verwenden Sie zum Abrufen den folgenden Befehl name-Spalte unter drei Spalten aus dem DataFrame.

scala> dfs.select("name").show()

Output - Sie können die Werte der sehen name Säule.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Verwenden Sie den Altersfilter

Verwenden Sie den folgenden Befehl, um die Mitarbeiter zu finden, deren Alter größer als 23 Jahre ist (Alter> 23 Jahre).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Verwenden Sie die groupBy-Methode

Verwenden Sie den folgenden Befehl, um die Anzahl der gleichaltrigen Mitarbeiter zu zählen.

scala> dfs.groupBy("age").count().show()

Output - Zwei Mitarbeiter sind 23 Jahre alt.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Programmgesteuertes Ausführen von SQL-Abfragen

Ein SQLContext ermöglicht es Anwendungen, SQL-Abfragen programmgesteuert auszuführen, während SQL-Funktionen ausgeführt werden, und gibt das Ergebnis als DataFrame zurück.

Im Hintergrund unterstützt SparkSQL im Allgemeinen zwei verschiedene Methoden zum Konvertieren vorhandener RDDs in DataFrames:

Sr. Nr Methoden & Beschreibung
1 Ableiten des Schemas mithilfe von Reflection

Diese Methode generiert mithilfe der Reflexion das Schema einer RDD, die bestimmte Objekttypen enthält.

2 Programmgesteuertes Festlegen des Schemas

Die zweite Methode zum Erstellen von DataFrame ist die programmgesteuerte Schnittstelle, mit der Sie ein Schema erstellen und dann auf eine vorhandene RDD anwenden können.