Spark SQL - Краткое руководство

Отрасли широко используют Hadoop для анализа своих наборов данных. Причина в том, что фреймворк Hadoop основан на простой модели программирования (MapReduce) и обеспечивает масштабируемое, гибкое, отказоустойчивое и экономичное вычислительное решение. Здесь основная задача - поддерживать скорость обработки больших наборов данных с точки зрения времени ожидания между запросами и времени ожидания для запуска программы.

Spark был представлен Apache Software Foundation для ускорения программного процесса вычислительных вычислений Hadoop.

В отличие от общего убеждения, Spark is not a modified version of Hadoopи на самом деле не зависит от Hadoop, потому что имеет собственное управление кластером. Hadoop - это лишь один из способов реализации Spark.

Spark использует Hadoop двумя способами. storage и второй processing. Поскольку Spark имеет собственное вычисление для управления кластером, он использует Hadoop только для хранения.

Apache Spark

Apache Spark - это молниеносная технология кластерных вычислений, предназначенная для быстрых вычислений. Он основан на Hadoop MapReduce и расширяет модель MapReduce, чтобы эффективно использовать ее для большего количества типов вычислений, включая интерактивные запросы и потоковую обработку. Главная особенность Spark - этоin-memory cluster computing что увеличивает скорость обработки приложения.

Spark предназначен для охвата широкого спектра рабочих нагрузок, таких как пакетные приложения, итерационные алгоритмы, интерактивные запросы и потоковая передача. Помимо поддержки всех этих рабочих нагрузок в соответствующей системе, он снижает нагрузку на управление, связанную с поддержкой отдельных инструментов.

Эволюция Apache Spark

Spark - это один из подпроектов Hadoop, разработанный в 2009 году в AMPLab Калифорнийского университета в Беркли Матей Захария. Он был открыт в 2010 году по лицензии BSD. Он был передан в дар Фонду программного обеспечения Apache в 2013 году, и теперь Apache Spark стал проектом Apache верхнего уровня с февраля 2014 года.

Особенности Apache Spark

Apache Spark имеет следующие особенности.

  • Speed- Spark помогает запускать приложение в кластере Hadoop до 100 раз быстрее в памяти и в 10 раз быстрее при запуске на диске. Это возможно за счет уменьшения количества операций чтения / записи на диск. Он хранит данные промежуточной обработки в памяти.

  • Supports multiple languages- Spark предоставляет встроенные API на Java, Scala или Python. Поэтому вы можете писать приложения на разных языках. Spark предлагает 80 операторов высокого уровня для интерактивных запросов.

  • Advanced Analytics- Spark поддерживает не только «Карта» и «уменьшить». Он также поддерживает SQL-запросы, потоковые данные, машинное обучение (ML) и алгоритмы Graph.

Spark на основе Hadoop

На следующей схеме показаны три способа создания Spark с компонентами Hadoop.

Ниже описаны три способа развертывания Spark.

  • Standalone- Автономное развертывание Spark означает, что Spark занимает место поверх HDFS (распределенной файловой системы Hadoop), а пространство для HDFS выделяется явно. Здесь Spark и MapReduce будут работать бок о бок, чтобы охватить все искровые задания в кластере.

  • Hadoop Yarn- Развертывание Hadoop Yarn означает, что искра запускается на Yarn без предварительной установки или корневого доступа. Это помогает интегрировать Spark в экосистему Hadoop или стек Hadoop. Это позволяет другим компонентам работать поверх стека.

  • Spark in MapReduce (SIMR)- Spark в MapReduce используется для запуска Spark Job в дополнение к автономному развертыванию. С SIMR пользователь может запустить Spark и использовать его оболочку без какого-либо административного доступа.

Компоненты Spark

На следующем рисунке показаны различные компоненты Spark.

Ядро Apache Spark

Spark Core - это основной движок общего исполнения для платформы Spark, на которой построены все остальные функции. Он обеспечивает вычисления в памяти и ссылки на наборы данных во внешних системах хранения.

Spark SQL

Spark SQL - это компонент поверх Spark Core, который представляет новую абстракцию данных под названием SchemaRDD, которая обеспечивает поддержку структурированных и полуструктурированных данных.

Spark Streaming

Spark Streaming использует возможность быстрого планирования Spark Core для выполнения потоковой аналитики. Он принимает данные в мини-пакетах и ​​выполняет преобразования RDD (устойчивые распределенные наборы данных) для этих мини-пакетов данных.

MLlib (библиотека машинного обучения)

MLlib - это распределенная структура машинного обучения, превосходящая Spark, благодаря архитектуре Spark на основе распределенной памяти. Это, согласно тестам, сделано разработчиками MLlib для реализаций альтернативных наименьших квадратов (ALS). Spark MLlib в девять раз быстрее дисковой версии Hadoop.Apache Mahout (до того, как Mahout получил интерфейс Spark).

GraphX

GraphX ​​- это распределенная среда обработки графов поверх Spark. Он предоставляет API для выражения вычислений графов, которые могут моделировать определяемые пользователем графы с помощью API абстракции Pregel. Он также обеспечивает оптимизированную среду выполнения для этой абстракции.

Устойчивые распределенные наборы данных

Устойчивые распределенные наборы данных (RDD) - это фундаментальная структура данных Spark. Это неизменяемая распределенная коллекция объектов. Каждый набор данных в RDD разделен на логические разделы, которые могут быть вычислены на разных узлах кластера. СДР могут содержать любой тип объектов Python, Java или Scala, включая определяемые пользователем классы.

Формально RDD - это секционированная коллекция записей, доступная только для чтения. RDD могут быть созданы с помощью детерминированных операций либо с данными в стабильном хранилище, либо с другими RDD. RDD - это отказоустойчивый набор элементов, с которыми можно работать параллельно.

Есть два способа создания RDD: parallelizing существующая коллекция в вашей программе драйвера, или referencing a dataset во внешней системе хранения, например в общей файловой системе, HDFS, HBase или любом источнике данных, предлагающем входной формат Hadoop.

Spark использует концепцию RDD для более быстрых и эффективных операций MapReduce. Давайте сначала обсудим, как происходят операции MapReduce и почему они не так эффективны.

Обмен данными в MapReduce происходит медленно

MapReduce широко применяется для обработки и создания больших наборов данных с помощью параллельного распределенного алгоритма в кластере. Он позволяет пользователям писать параллельные вычисления, используя набор операторов высокого уровня, не беспокоясь о распределении работы и отказоустойчивости.

К сожалению, в большинстве современных платформ единственный способ повторно использовать данные между вычислениями (например, между двумя заданиями MapReduce) - это записать их во внешнюю стабильную систему хранения (например, HDFS). Хотя эта структура предоставляет множество абстракций для доступа к вычислительным ресурсам кластера, пользователи все же хотят большего.

И то и другое Iterative и Interactiveприложениям требуется более быстрый обмен данными между параллельными заданиями. Обмен данными в MapReduce происходит медленно из-заreplication, serialization, и disk IO. Что касается системы хранения, большинство приложений Hadoop тратят более 90% времени на выполнение операций чтения-записи HDFS.

Итерационные операции на MapReduce

Повторно используйте промежуточные результаты в нескольких вычислениях в многоэтапных приложениях. На следующем рисунке показано, как работает текущая структура при выполнении итерационных операций на MapReduce. Это влечет за собой значительные накладные расходы из-за репликации данных, дискового ввода-вывода и сериализации, что замедляет работу системы.

Интерактивные операции на MapReduce

Пользователь выполняет специальные запросы к одному и тому же подмножеству данных. Каждый запрос будет выполнять дисковый ввод-вывод в стабильном хранилище, что может доминировать над временем выполнения приложения.

На следующем рисунке показано, как работает текущая структура при выполнении интерактивных запросов на MapReduce.

Совместное использование данных с помощью Spark RDD

Обмен данными в MapReduce происходит медленно из-за replication, serialization, и disk IO. Большинство приложений Hadoop тратят более 90% времени на выполнение операций чтения-записи HDFS.

Осознавая эту проблему, исследователи разработали специализированный фреймворк под названием Apache Spark. Ключевая идея искрыRжизнеспособный Dраспределяется Dатасеты (RDD); он поддерживает вычисления в памяти. Это означает, что он сохраняет состояние памяти в виде объекта по всем заданиям, и этот объект может использоваться совместно этими заданиями. Обмен данными в памяти в 10–100 раз быстрее, чем в сети и на диске.

Давайте теперь попробуем выяснить, как итерационные и интерактивные операции происходят в Spark RDD.

Итерационные операции над Spark RDD

На приведенной ниже иллюстрации показаны итерационные операции в Spark RDD. Промежуточные результаты будут храниться в распределенной памяти вместо стабильного хранилища (диска), что сделает систему быстрее.

Note - Если распределенной памяти (ОЗУ) достаточно для хранения промежуточных результатов (состояние задания), то эти результаты будут сохранены на диске.

Интерактивные операции в Spark RDD

На этом рисунке показаны интерактивные операции в Spark RDD. Если к одному и тому же набору данных многократно выполняются разные запросы, эти конкретные данные можно хранить в памяти для увеличения времени выполнения.

По умолчанию каждый преобразованный RDD может пересчитываться каждый раз, когда вы запускаете над ним действие. Однако вы также можетеpersistRDD в памяти, и в этом случае Spark сохранит элементы в кластере для гораздо более быстрого доступа при следующем запросе. Также имеется поддержка сохранения RDD на диске или их репликации на нескольких узлах.

Spark - это подпроект Hadoop. Поэтому лучше установить Spark в систему на базе Linux. Следующие шаги показывают, как установить Apache Spark.

Шаг 1. Проверка установки Java

Установка Java - одно из обязательных при установке Spark. Попробуйте выполнить следующую команду, чтобы проверить версию JAVA.

$java -version

Если Java уже установлена ​​в вашей системе, вы увидите следующий ответ -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Если в вашей системе не установлена ​​Java, установите Java, прежде чем переходить к следующему шагу.

Шаг 2. Проверка установки Scala

Для реализации Spark вам нужен язык Scala. Итак, давайте проверим установку Scala, используя следующую команду.

$scala -version

Если Scala уже установлен в вашей системе, вы увидите следующий ответ:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Если в вашей системе не установлен Scala, переходите к следующему шагу по установке Scala.

Шаг 3: загрузка Scala

Загрузите последнюю версию Scala, перейдя по следующей ссылке Загрузить Scala . В этом руководстве мы используем версию scala-2.11.6. После загрузки вы найдете tar-файл Scala в папке загрузки.

Шаг 4: установка Scala

Следуйте приведенным ниже инструкциям по установке Scala.

Распакуйте tar-файл Scala

Введите следующую команду для извлечения tar-файла Scala.

$ tar xvf scala-2.11.6.tgz

Перемещение файлов программного обеспечения Scala

Используйте следующие команды для перемещения файлов программного обеспечения Scala в соответствующий каталог (/usr/local/scala).

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit

Установить PATH для Scala

Используйте следующую команду для установки PATH для Scala.

$ export PATH = $PATH:/usr/local/scala/bin

Проверка установки Scala

После установки лучше проверить. Используйте следующую команду для проверки установки Scala.

$scala -version

Если Scala уже установлен в вашей системе, вы увидите следующий ответ:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Шаг 5: загрузка Apache Spark

Загрузите последнюю версию Spark, перейдя по следующей ссылке Загрузить Spark . Для этого урока мы используемspark-1.3.1-bin-hadoop2.6версия. После загрузки вы найдете tar-файл Spark в папке загрузки.

Шаг 6. Установка Spark

Следуйте инструкциям ниже для установки Spark.

Извлечение Spark tar

Следующая команда для извлечения искрового tar-файла.

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Перемещение файлов программного обеспечения Spark

Следующие команды для перемещения файлов программного обеспечения Spark в соответствующий каталог (/usr/local/spark).

$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit

Настройка среды для Spark

Добавьте следующую строку в ~/.bashrcфайл. Это означает добавление места, где находится файл программного обеспечения Spark, в переменную PATH.

export PATH = $PATH:/usr/local/spark/bin

Используйте следующую команду для получения файла ~ / .bashrc.

$ source ~/.bashrc

Шаг 7. Проверка установки Spark

Напишите следующую команду для открытия оболочки Spark.

$spark-shell

Если искра установлена ​​успешно, вы увидите следующий результат.

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
    ____             __
   / __/__ ___ _____/ /__
   _\ \/ _ \/ _ `/ __/ '_/
   /___/ .__/\_,_/_/ /_/\_\ version 1.4.0
      /_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Spark представляет программный модуль для обработки структурированных данных под названием Spark SQL. Он предоставляет программную абстракцию под названием DataFrame и может действовать как механизм распределенных запросов SQL.

Особенности Spark SQL

Ниже приведены особенности Spark SQL:

  • Integrated- Легко смешивайте запросы SQL с программами Spark. Spark SQL позволяет запрашивать структурированные данные в виде распределенного набора данных (RDD) в Spark с интегрированными API-интерфейсами в Python, Scala и Java. Эта тесная интеграция позволяет легко выполнять запросы SQL вместе со сложными аналитическими алгоритмами.

  • Unified Data Access- Загружать и запрашивать данные из различных источников. Schema-RDD предоставляют единый интерфейс для эффективной работы со структурированными данными, включая таблицы Apache Hive, файлы parquet и файлы JSON.

  • Hive Compatibility- Запускать неизмененные запросы Hive на существующих складах. Spark SQL повторно использует интерфейс Hive и MetaStore, обеспечивая полную совместимость с существующими данными, запросами и пользовательскими функциями Hive. Просто установите его вместе с Hive.

  • Standard Connectivity- Подключайтесь через JDBC или ODBC. Spark SQL включает режим сервера с возможностью подключения по отраслевым стандартам JDBC и ODBC.

  • Scalability- Используйте один и тот же движок как для интерактивных, так и для длинных запросов. Spark SQL использует преимущества модели RDD для поддержки отказоустойчивости при выполнении промежуточных запросов, что позволяет масштабировать ее также и для крупных заданий. Не беспокойтесь об использовании другого движка для исторических данных.

Архитектура Spark SQL

На следующем рисунке объясняется архитектура Spark SQL.

Эта архитектура содержит три уровня, а именно: API языка, RDD схемы и источники данных.

  • Language API- Spark совместим с разными языками и Spark SQL. Он также поддерживается этими языками - API (python, scala, java, HiveQL).

  • Schema RDD- Spark Core разработан с использованием специальной структуры данных, называемой RDD. Как правило, Spark SQL работает со схемами, таблицами и записями. Следовательно, мы можем использовать Schema RDD как временную таблицу. Мы можем назвать эту схему RDD как Data Frame.

  • Data Sources- Обычно источником данных для Spark-core является текстовый файл, файл Avro и т. Д. Однако источники данных для Spark SQL отличаются. Это файл Parquet, документ JSON, таблицы HIVE и база данных Cassandra.

Мы обсудим это более подробно в следующих главах.

DataFrame - это распределенный набор данных, который организован в именованные столбцы. Концептуально это эквивалентно реляционным таблицам с хорошими методами оптимизации.

DataFrame может быть построен из массива различных источников, таких как таблицы Hive, файлы структурированных данных, внешние базы данных или существующие RDD. Этот API был разработан для современных приложений для работы с большими данными и науки о данных, основанных наDataFrame in R Programming и Pandas in Python.

Особенности DataFrame

Вот несколько характерных особенностей DataFrame -

  • Возможность обрабатывать данные размером от килобайт до петабайт на кластере с одним узлом и большим кластером.

  • Поддерживает различные форматы данных (Avro, csv, эластичный поиск и Cassandra) и системы хранения (HDFS, таблицы HIVE, mysql и т. Д.).

  • Современная оптимизация и генерация кода с помощью оптимизатора Spark SQL Catalyst (структура преобразования дерева).

  • Может быть легко интегрирован со всеми инструментами и фреймворками больших данных через Spark-Core.

  • Предоставляет API для программирования на Python, Java, Scala и R.

SQLContext

SQLContext - это класс, который используется для инициализации функций Spark SQL. Объект класса SparkContext (sc) требуется для инициализации объекта класса SQLContext.

Следующая команда используется для инициализации SparkContext через spark-shell.

$ spark-shell

По умолчанию объект SparkContext инициализируется именем sc когда запускается искровой снаряд.

Используйте следующую команду для создания SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

пример

Давайте рассмотрим пример записей сотрудников в файле JSON с именем employee.json. Используйте следующие команды, чтобы создать DataFrame (df) и прочитать документ JSON с именемemployee.json со следующим содержанием.

employee.json - Поместите этот файл в каталог, в котором находится текущий scala> указатель находится.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Операции с DataFrame

DataFrame предоставляет предметно-ориентированный язык для управления структурированными данными. Здесь мы включаем несколько основных примеров обработки структурированных данных с использованием DataFrames.

Следуйте инструкциям ниже, чтобы выполнить операции DataFrame -

Прочтите документ JSON

Во-первых, мы должны прочитать документ JSON. На основе этого сгенерируйте DataFrame с именем (dfs).

Используйте следующую команду, чтобы прочитать документ JSON с именем employee.json. Данные отображаются в виде таблицы с полями - идентификатор, имя и возраст.

scala> val dfs = sqlContext.read.json("employee.json")

Output - Имена полей берутся автоматически из employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Показать данные

Если вы хотите увидеть данные в DataFrame, используйте следующую команду.

scala> dfs.show()

Output - Вы можете увидеть данные о сотрудниках в табличном формате.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Используйте метод printSchema

Если вы хотите увидеть структуру (схему) DataFrame, используйте следующую команду.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Использовать метод выбора

Используйте следующую команду для получения name-столбец среди трех столбцов из DataFrame.

scala> dfs.select("name").show()

Output - Вы можете увидеть значения name столбец.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Использовать возрастной фильтр

Используйте следующую команду для поиска сотрудников, возраст которых превышает 23 года (возраст> 23).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Использовать метод groupBy

Используйте следующую команду для подсчета количества сотрудников одного возраста.

scala> dfs.groupBy("age").count().show()

Output - двое сотрудников - 23 года.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Программный запуск SQL-запросов

SQLContext позволяет приложениям запускать SQL-запросы программно при выполнении функций SQL и возвращает результат в виде DataFrame.

Как правило, в фоновом режиме SparkSQL поддерживает два разных метода преобразования существующих RDD в DataFrames:

Старший Нет Методы и описание
1 Вывод схемы с использованием отражения

Этот метод использует отражение для создания схемы RDD, содержащей определенные типы объектов.

2 Программное указание схемы

Второй метод создания DataFrame - это программный интерфейс, который позволяет вам построить схему, а затем применить ее к существующему RDD.

Интерфейс DataFrame позволяет различным источникам данных работать с Spark SQL. Это временная таблица, которую можно использовать как обычный RDD. Регистрация DataFrame в виде таблицы позволяет запускать SQL-запросы к его данным.

В этой главе мы опишем общие методы загрузки и сохранения данных с использованием различных источников данных Spark. После этого мы подробно обсудим конкретные параметры, доступные для встроенных источников данных.

В SparkSQL доступны различные типы источников данных, некоторые из которых перечислены ниже -

Старший Нет Источники данных
1 Наборы данных JSON

Spark SQL может автоматически захватывать схему набора данных JSON и загружать ее как DataFrame.

2 Таблицы-ульи

Hive поставляется в комплекте с библиотекой Spark как HiveContext, который наследуется от SQLContext.

3 Файлы для паркета

Паркет - это столбчатый формат, поддерживаемый многими системами обработки данных.