Spark SQL - DataFrames
Um DataFrame é uma coleção distribuída de dados, que é organizada em colunas nomeadas. Conceitualmente, é equivalente a tabelas relacionais com boas técnicas de otimização.
Um DataFrame pode ser construído a partir de uma matriz de fontes diferentes, como tabelas Hive, arquivos de dados estruturados, bancos de dados externos ou RDDs existentes. Esta API foi projetada para aplicativos modernos de Big Data e ciência de dados, inspirando-se emDataFrame in R Programming e Pandas in Python.
Recursos do DataFrame
Aqui está um conjunto de alguns recursos característicos do DataFrame -
Capacidade de processar os dados no tamanho de Kilobytes a Petabytes em um cluster de nó único para cluster grande.
Suporta diferentes formatos de dados (Avro, csv, elastic search e Cassandra) e sistemas de armazenamento (HDFS, tabelas HIVE, mysql, etc).
Otimização de última geração e geração de código por meio do otimizador Spark SQL Catalyst (estrutura de transformação de árvore).
Pode ser facilmente integrado com todas as ferramentas e estruturas de Big Data via Spark-Core.
Fornece API para programação Python, Java, Scala e R.
SQLContext
SQLContext é uma classe e é usado para inicializar as funcionalidades do Spark SQL. O objeto de classe SparkContext (sc) é necessário para inicializar o objeto de classe SQLContext.
O seguinte comando é usado para inicializar o SparkContext por meio do spark-shell.
$ spark-shell
Por padrão, o objeto SparkContext é inicializado com o nome sc quando a faísca começa.
Use o seguinte comando para criar SQLContext.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
Exemplo
Vamos considerar um exemplo de registros de funcionários em um arquivo JSON chamado employee.json. Use os comandos a seguir para criar um DataFrame (df) e ler um documento JSON chamadoemployee.json com o seguinte conteúdo.
employee.json - Coloque este arquivo no diretório onde o atual scala> o ponteiro está localizado.
{
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
}
Operações de DataFrame
DataFrame fornece uma linguagem específica de domínio para manipulação de dados estruturados. Aqui, incluímos alguns exemplos básicos de processamento de dados estruturados usando DataFrames.
Siga as etapas fornecidas abaixo para realizar operações DataFrame -
Leia o documento JSON
Primeiro, temos que ler o documento JSON. Com base nisso, gere um DataFrame denominado (dfs).
Use o seguinte comando para ler o documento JSON denominado employee.json. Os dados são mostrados como uma tabela com os campos - id, nome e idade.
scala> val dfs = sqlContext.read.json("employee.json")
Output - Os nomes dos campos são retirados automaticamente de employee.json.
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
Mostrar os dados
Se você quiser ver os dados no DataFrame, use o seguinte comando.
scala> dfs.show()
Output - Você pode ver os dados do funcionário em um formato tabular.
<console>:22, took 0.052610 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
| 23 | 1204 | javed |
| 23 | 1205 | prudvi |
+----+------+--------+
Use o método printSchema
Se você quiser ver a Estrutura (Esquema) do DataFrame, use o seguinte comando.
scala> dfs.printSchema()
Output
root
|-- age: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Use Select Method
Use o seguinte comando para buscar name-coluna entre três colunas do DataFrame.
scala> dfs.select("name").show()
Output - Você pode ver os valores do name coluna.
<console>:22, took 0.044023 s
+--------+
| name |
+--------+
| satish |
| krishna|
| amith |
| javed |
| prudvi |
+--------+
Usar filtro de idade
Use o seguinte comando para localizar os funcionários com idade superior a 23 (idade> 23).
scala> dfs.filter(dfs("age") > 23).show()
Output
<console>:22, took 0.078670 s
+----+------+--------+
|age | id | name |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith |
+----+------+--------+
Use o método groupBy
Use o seguinte comando para contar o número de funcionários que têm a mesma idade.
scala> dfs.groupBy("age").count().show()
Output - dois funcionários têm 23 anos.
<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 | 2 |
| 25 | 1 |
| 28 | 1 |
| 39 | 1 |
+----+-----+
Execução de consultas SQL programaticamente
Um SQLContext permite que os aplicativos executem consultas SQL programaticamente enquanto executam funções SQL e retorna o resultado como um DataFrame.
Geralmente, em segundo plano, SparkSQL oferece suporte a dois métodos diferentes para converter RDDs existentes em DataFrames -
Sr. Não | Métodos e Descrição |
---|---|
1 | Inferindo o esquema usando reflexão Este método usa reflexão para gerar o esquema de um RDD que contém tipos específicos de objetos. |
2 | Especificando programaticamente o esquema O segundo método para criar DataFrame é por meio de uma interface programática que permite construir um esquema e aplicá-lo a um RDD existente. |