Spark SQL - фреймы данных

DataFrame - это распределенный набор данных, который организован в именованные столбцы. Концептуально это эквивалентно реляционным таблицам с хорошими методами оптимизации.

DataFrame может быть построен из массива различных источников, таких как таблицы Hive, файлы структурированных данных, внешние базы данных или существующие RDD. Этот API был разработан для современных приложений для работы с большими данными и науки о данных, основанных наDataFrame in R Programming и Pandas in Python.

Особенности DataFrame

Вот несколько характерных особенностей DataFrame -

  • Возможность обрабатывать данные размером от килобайт до петабайт на кластере с одним узлом и большим кластером.

  • Поддерживает различные форматы данных (Avro, csv, эластичный поиск и Cassandra) и системы хранения (HDFS, таблицы HIVE, mysql и т. Д.).

  • Современная оптимизация и генерация кода с помощью оптимизатора Spark SQL Catalyst (структура преобразования дерева).

  • Может быть легко интегрирован со всеми инструментами и фреймворками больших данных через Spark-Core.

  • Предоставляет API для программирования на Python, Java, Scala и R.

SQLContext

SQLContext - это класс, который используется для инициализации функций Spark SQL. Объект класса SparkContext (sc) требуется для инициализации объекта класса SQLContext.

Следующая команда используется для инициализации SparkContext через spark-shell.

$ spark-shell

По умолчанию объект SparkContext инициализируется именем sc когда запускается искровой снаряд.

Используйте следующую команду для создания SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

пример

Давайте рассмотрим пример записей сотрудников в файле JSON с именем employee.json. Используйте следующие команды, чтобы создать DataFrame (df) и прочитать документ JSON с именемemployee.json со следующим содержанием.

employee.json - Поместите этот файл в каталог, в котором находится текущий scala> указатель находится.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Операции с DataFrame

DataFrame предоставляет предметно-ориентированный язык для управления структурированными данными. Здесь мы включаем несколько основных примеров обработки структурированных данных с использованием DataFrames.

Следуйте инструкциям ниже, чтобы выполнить операции DataFrame -

Прочтите документ JSON

Во-первых, мы должны прочитать документ JSON. На основе этого сгенерируйте DataFrame с именем (dfs).

Используйте следующую команду, чтобы прочитать документ JSON с именем employee.json. Данные отображаются в виде таблицы с полями - идентификатор, имя и возраст.

scala> val dfs = sqlContext.read.json("employee.json")

Output - Имена полей берутся автоматически из employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Показать данные

Если вы хотите увидеть данные в DataFrame, используйте следующую команду.

scala> dfs.show()

Output - Вы можете увидеть данные о сотрудниках в табличном формате.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Используйте метод printSchema

Если вы хотите увидеть структуру (схему) DataFrame, используйте следующую команду.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Использовать метод выбора

Используйте следующую команду для получения name-столбец среди трех столбцов из DataFrame.

scala> dfs.select("name").show()

Output - Вы можете увидеть значения name столбец.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Использовать возрастной фильтр

Используйте следующую команду для поиска сотрудников, возраст которых превышает 23 года (возраст> 23).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Использовать метод groupBy

Используйте следующую команду для подсчета количества сотрудников одного возраста.

scala> dfs.groupBy("age").count().show()

Output - двое сотрудников - 23 года.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Программный запуск SQL-запросов

SQLContext позволяет приложениям программно выполнять SQL-запросы при выполнении функций SQL и возвращает результат в виде DataFrame.

Как правило, в фоновом режиме SparkSQL поддерживает два разных метода преобразования существующих RDD в DataFrames:

Старший Нет Методы и описание
1 Вывод схемы с использованием отражения

Этот метод использует отражение для создания схемы RDD, содержащей определенные типы объектов.

2 Программное указание схемы

Второй метод создания DataFrame - это программный интерфейс, который позволяет вам создать схему и затем применить ее к существующему RDD.