Spark SQL - DataFrames
Ein DataFrame ist eine verteilte Sammlung von Daten, die in benannten Spalten organisiert ist. Konzeptionell entspricht es relationalen Tabellen mit guten Optimierungstechniken.
Ein DataFrame kann aus einer Reihe verschiedener Quellen wie Hive-Tabellen, strukturierten Datendateien, externen Datenbanken oder vorhandenen RDDs erstellt werden. Diese API wurde für moderne Big Data- und Data Science-Anwendungen entwickelt, die sich von diesen inspirieren lassenDataFrame in R Programming und Pandas in Python.
Funktionen von DataFrame
Hier sind einige charakteristische Merkmale von DataFrame:
Möglichkeit, die Daten in der Größe von Kilobyte bis Petabyte auf einem einzelnen Knotencluster zu einem großen Cluster zu verarbeiten.
Unterstützt verschiedene Datenformate (Avro, CSV, elastische Suche und Cassandra) und Speichersysteme (HDFS, HIVE-Tabellen, MySQL usw.).
Modernste Optimierung und Codegenerierung mit dem Spark SQL Catalyst-Optimierer (Tree Transformation Framework).
Kann über Spark-Core problemlos in alle Big Data-Tools und -Frameworks integriert werden.
Bietet API für Python-, Java-, Scala- und R-Programmierung.
SQLContext
SQLContext ist eine Klasse und wird zum Initialisieren der Funktionen von Spark SQL verwendet. Das SparkContext-Klassenobjekt (sc) wird zum Initialisieren des SQLContext-Klassenobjekts benötigt.
Der folgende Befehl wird zum Initialisieren des SparkContext über die Spark-Shell verwendet.
$ spark-shell
Standardmäßig wird das SparkContext-Objekt mit dem Namen initialisiert sc wenn die Funkenschale startet.
Verwenden Sie den folgenden Befehl, um SQLContext zu erstellen.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
Beispiel
Betrachten wir ein Beispiel für Mitarbeiterdatensätze in einer JSON-Datei mit dem Namen employee.json. Verwenden Sie die folgenden Befehle, um einen DataFrame (df) zu erstellen und ein JSON-Dokument mit dem Namen zu lesenemployee.json mit folgendem Inhalt.
employee.json - Legen Sie diese Datei in dem Verzeichnis ab, in dem sich die aktuelle befindet scala> Zeiger befindet sich.
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
DataFrame-Operationen
DataFrame bietet eine domänenspezifische Sprache für die strukturierte Datenmanipulation. Hier finden Sie einige grundlegende Beispiele für die strukturierte Datenverarbeitung mit DataFrames.
Führen Sie die folgenden Schritte aus, um DataFrame-Vorgänge auszuführen.
Lesen Sie das JSON-Dokument
Zuerst müssen wir das JSON-Dokument lesen. Generieren Sie darauf basierend einen DataFrame mit dem Namen (dfs).
Verwenden Sie den folgenden Befehl, um das genannte JSON-Dokument zu lesen employee.json. Die Daten werden als Tabelle mit den Feldern ID, Name und Alter angezeigt.
scala> val dfs = sqlContext.read.json("employee.json")
Output - Die Feldnamen werden automatisch übernommen employee.json.
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
Zeigen Sie die Daten an
Wenn Sie die Daten im DataFrame anzeigen möchten, verwenden Sie den folgenden Befehl.
scala> dfs.show()
Output - Sie können die Mitarbeiterdaten in Tabellenform anzeigen.
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
Verwenden Sie die printSchema-Methode
Wenn Sie die Struktur (das Schema) des DataFrame anzeigen möchten, verwenden Sie den folgenden Befehl.
scala> dfs.printSchema()
Output
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Verwenden Sie die Auswahlmethode
Verwenden Sie zum Abrufen den folgenden Befehl name-Spalte unter drei Spalten aus dem DataFrame.
scala> dfs.select("name").show()
Output - Sie können die Werte der sehen name Säule.
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
Verwenden Sie den Altersfilter
Verwenden Sie den folgenden Befehl, um die Mitarbeiter zu finden, deren Alter größer als 23 Jahre ist (Alter> 23 Jahre).
scala> dfs.filter(dfs("age") > 23).show()
Output
<console>:22, took 0.078670 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
+----+------+--------+
Verwenden Sie die groupBy-Methode
Verwenden Sie den folgenden Befehl, um die Anzahl der gleichaltrigen Mitarbeiter zu zählen.
scala> dfs.groupBy("age").count().show()
Output - Zwei Mitarbeiter sind 23 Jahre alt.
<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 | 2 |
| 25 | 1 |
| 28 | 1 |
| 39 | 1 |
+----+-----+
Programmgesteuertes Ausführen von SQL-Abfragen
Ein SQLContext ermöglicht es Anwendungen, SQL-Abfragen programmgesteuert auszuführen, während SQL-Funktionen ausgeführt werden, und gibt das Ergebnis als DataFrame zurück.
Im Hintergrund unterstützt SparkSQL im Allgemeinen zwei verschiedene Methoden zum Konvertieren vorhandener RDDs in DataFrames:
Sr. Nr | Methoden & Beschreibung |
---|---|
1 | Ableiten des Schemas mithilfe von Reflection Diese Methode generiert mithilfe der Reflexion das Schema einer RDD, die bestimmte Objekttypen enthält. |
2 | Programmgesteuertes Festlegen des Schemas Die zweite Methode zum Erstellen von DataFrame ist die programmgesteuerte Schnittstelle, mit der Sie ein Schema erstellen und dann auf eine vorhandene RDD anwenden können. |