Apache Spark - implantação

O aplicativo Spark, usando spark-submit, é um comando shell usado para implantar o aplicativo Spark em um cluster. Ele usa todos os respectivos gerenciadores de cluster por meio de uma interface uniforme. Portanto, você não precisa configurar seu aplicativo para cada um.

Exemplo

Tomemos o mesmo exemplo de contagem de palavras que usamos antes, usando comandos shell. Aqui, consideramos o mesmo exemplo de um aplicativo Spark.

Amostra de entrada

O texto a seguir são os dados de entrada e o arquivo nomeado é in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Veja o seguinte programa -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Salve o programa acima em um arquivo chamado SparkWordCount.scala e coloque-o em um diretório definido pelo usuário chamado spark-application.

Note - Ao transformar o inputRDD em countRDD, estamos usando flatMap () para tokenizar as linhas (do arquivo de texto) em palavras, o método map () para contar a frequência da palavra e o método reduceByKey () para contar cada repetição de palavra.

Use as etapas a seguir para enviar este aplicativo. Execute todas as etapas nospark-application diretório através do terminal.

Etapa 1: Baixe o Spark Ja

Spark core jar é necessário para a compilação, portanto, baixe spark-core_2.10-1.3.0.jar do seguinte link Spark core jar e mova o arquivo jar do diretório de download paraspark-application diretório.

Etapa 2: compilar o programa

Compile o programa acima usando o comando fornecido abaixo. Este comando deve ser executado a partir do diretório do aplicativo spark. Aqui,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar é um jar de suporte do Hadoop obtido da biblioteca Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Etapa 3: Criar um JAR

Crie um arquivo jar do aplicativo spark usando o seguinte comando. Aqui,wordcount é o nome do arquivo para o arquivo jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Etapa 4: enviar o aplicativo Spark

Envie o aplicativo Spark usando o seguinte comando -

spark-submit --class SparkWordCount --master local wordcount.jar

Se for executado com sucesso, você encontrará a saída fornecida abaixo. oOKdeixar a seguinte saída é para identificação do usuário e essa é a última linha do programa. Se você ler cuidadosamente a saída a seguir, encontrará coisas diferentes, como -

  • serviço iniciado com sucesso 'sparkDriver' na porta 42954
  • MemoryStore começou com capacidade de 267,3 MB
  • SparkUI iniciado em http://192.168.1.217:4040
  • Arquivo JAR adicionado: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile em SparkPi.scala: 11) terminou em 0,566 s
  • Interrompeu a interface do usuário da Web do Spark em http://192.168.1.217:4040
  • MemoryStore apagado
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Etapa 5: verificar a saída

Após a execução bem-sucedida do programa, você encontrará o diretório chamado outfile no diretório do aplicativo spark.

Os comandos a seguir são usados ​​para abrir e verificar a lista de arquivos no diretório outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Os comandos para verificar a saída em part-00000 arquivo são -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Os comandos para verificar a saída no arquivo parte-00001 são -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Vá até a seção a seguir para saber mais sobre o comando 'spark-submit'.

Sintaxe de envio do Spark

spark-submit [options] <app jar | python file> [app arguments]

Opções

S.Não Opção Descrição
1 --mestre spark: // host: port, mesos: // host: port, yarn ou local.
2 --deploy-mode Se deve iniciar o programa de driver localmente ("cliente") ou em uma das máquinas de trabalho dentro do cluster ("cluster") (Padrão: cliente).
3 --classe A classe principal do seu aplicativo (para aplicativos Java / Scala).
4 --nome Um nome de seu aplicativo.
5 --jars Lista separada por vírgulas de jars locais para incluir nos caminhos de classe do driver e do executor.
6 - pacotes Lista separada por vírgulas de coordenadas maven de jars para incluir nos caminhos de classe do driver e do executor.
7 --repositórios Lista separada por vírgulas de repositórios remotos adicionais para pesquisar as coordenadas maven fornecidas com --packages.
8 --py-files Lista separada por vírgulas de arquivos .zip, .egg ou .py para colocar no PYTHON PATH para aplicativos Python.
9 --arquivos Lista separada por vírgulas de arquivos a serem colocados no diretório de trabalho de cada executor.
10 --conf (prop = val) Propriedade de configuração arbitrária do Spark.
11 --properties-file Caminho para um arquivo do qual carregar propriedades extras. Se não for especificado, ele procurará conf / spark-defaults.
12 --driver-memory Memória para driver (por exemplo, 1000M, 2G) (Padrão: 512M).
13 --driver-java-options Opções extras de Java para passar para o driver.
14 --driver-library-path Entradas de caminho de biblioteca extras para passar para o driver.
15 --driver-class-path

Entradas de caminho de classe extras para passar ao motorista.

Observe que os jars adicionados com --jars são incluídos automaticamente no caminho de classe.

16 --executor-memory Memória por executor (por exemplo, 1000M, 2G) (Padrão: 1G).
17 --proxy-user Usuário para personificar ao enviar o aplicativo.
18 --help, -h Mostre esta mensagem de ajuda e saia.
19 --verbose, -v Imprime saída de depuração adicional.
20 --versão Imprima a versão do Spark atual.
21 --driver-cores NUM Cores para driver (Padrão: 1).
22 --supervisionar Se fornecido, reinicia o driver em caso de falha.
23 --matar Se fornecido, mata o driver especificado.
24 --status Se fornecido, solicita o status do driver especificado.
25 --total-executor-cores Total de núcleos para todos os executores.
26 --executor-cores Número de núcleos por executor. (Padrão: 1 no modo YARN ou todos os núcleos disponíveis no trabalhador no modo autônomo).