Apache Spark - Distribuzione

L'applicazione Spark, utilizzando spark-submit, è un comando shell usato per distribuire l'applicazione Spark in un cluster. Utilizza tutti i rispettivi gestori di cluster tramite un'interfaccia uniforme. Pertanto, non è necessario configurare l'applicazione per ciascuna di esse.

Esempio

Prendiamo lo stesso esempio di conteggio delle parole che abbiamo usato prima, usando i comandi della shell. Qui, consideriamo lo stesso esempio di un'applicazione Spark.

Input di esempio

Il testo seguente è i dati di input e il file denominato è in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Guarda il seguente programma -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Salva il programma sopra in un file denominato SparkWordCount.scala e posizionarlo in una directory definita dall'utente denominata spark-application.

Note - Durante la trasformazione di inputRDD in countRDD, stiamo usando flatMap () per tokenizzare le righe (da file di testo) in parole, il metodo map () per contare la frequenza delle parole e il metodo reduceByKey () per contare ogni ripetizione di parole.

Utilizzare i seguenti passaggi per inviare questa domanda. Esegui tutti i passaggi inspark-application directory tramite il terminale.

Passaggio 1: scarica Spark Ja

Spark core jar è richiesto per la compilazione, quindi scarica spark-core_2.10-1.3.0.jar dal seguente link Spark core jar e sposta il file jar dalla directory di download aspark-application directory.

Passaggio 2: compilare il programma

Compilare il programma precedente utilizzando il comando fornito di seguito. Questo comando dovrebbe essere eseguito dalla directory spark-application. Qui,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar è un jar di supporto Hadoop tratto dalla libreria Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Passaggio 3: creare un JAR

Crea un file jar dell'applicazione Spark utilizzando il seguente comando. Qui,wordcount è il nome del file per il file jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Passaggio 4: inviare l'applicazione Spark

Invia l'applicazione Spark utilizzando il seguente comando:

spark-submit --class SparkWordCount --master local wordcount.jar

Se viene eseguito correttamente, troverai l'output fornito di seguito. IlOKlasciare che il seguente output sia per l'identificazione dell'utente e questa è l'ultima riga del programma. Se leggi attentamente il seguente output, troverai cose diverse, come:

  • avviato con successo il servizio "sparkDriver" sulla porta 42954
  • MemoryStore è stato avviato con una capacità di 267,3 MB
  • SparkUI avviato su http://192.168.1.217:4040
  • File JAR aggiunto: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile at SparkPi.scala: 11) terminato in 0,566 s
  • Interfaccia utente Web di Spark interrotta su http://192.168.1.217:4040
  • MemoryStore cancellato
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Passaggio 5: controllo dell'output

Dopo aver eseguito con successo il programma, troverai la directory denominata outfile nella directory spark-application.

I seguenti comandi vengono utilizzati per aprire e controllare l'elenco dei file nella directory outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

I comandi per il check-in dell'output part-00000 file sono -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

I comandi per il controllo dell'output nel file part-00001 sono:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Passare alla sezione seguente per saperne di più sul comando "spark-submit".

Sintassi Spark-submit

spark-submit [options] <app jar | python file> [app arguments]

Opzioni

S.No Opzione Descrizione
1 --maestro spark: // host: port, mesos: // host: port, filato o local.
2 - modalità di distribuzione Indica se avviare il programma driver localmente ("client") o su una delle macchine worker all'interno del cluster ("cluster") (impostazione predefinita: client).
3 --classe La classe principale della tua applicazione (per app Java / Scala).
4 --nome Un nome della tua applicazione.
5 --barattoli Elenco separato da virgole di file jar locali da includere nei percorsi classe del driver e dell'esecutore.
6 --pacchi Elenco separato da virgole di coordinate Maven di jar da includere nei percorsi classi del driver e dell'esecutore.
7 - repository Elenco separato da virgole di repository remoti aggiuntivi per cercare le coordinate Maven fornite con --packages.
8 --py-files Elenco separato da virgole di file .zip, .egg o .py da inserire nel PERCORSO PYTHON per le app Python.
9 --File Elenco di file separato da virgole da inserire nella directory di lavoro di ciascun esecutore.
10 --conf (prop = val) Proprietà di configurazione Spark arbitraria.
11 --properties-file Percorso di un file da cui caricare proprietà aggiuntive. Se non specificato, cercherà conf / spark-defaults.
12 --driver-memory Memoria per il driver (es. 1000M, 2G) (impostazione predefinita: 512M).
13 --driver-java-opzioni Opzioni Java aggiuntive da passare al driver.
14 --driver-library-path Voci del percorso della libreria extra da passare al driver.
15 --driver-class-path

Voci del percorso di classe extra da passare al driver.

Nota che i jar aggiunti con --jars vengono automaticamente inclusi nel classpath.

16 --executor-memory Memoria per esecutore (es. 1000M, 2G) (impostazione predefinita: 1G).
17 --proxy-user Utente da impersonare durante l'invio della domanda.
18 --help, -h Mostra questo messaggio di aiuto ed esci.
19 --verbose, -v Stampa l'output di debug aggiuntivo.
20 --versione Stampa la versione dell'attuale Spark.
21 --driver-core NUM Core per il driver (predefinito: 1).
22 --sorvegliare Se fornito, riavvia il driver in caso di errore.
23 --uccidere Se dato, uccide il conducente specificato.
24 --stato Se fornito, richiede lo stato del driver specificato.
25 --total-executor-core Core totali per tutti gli esecutori.
26 --executor-core Numero di core per esecutore. (Predefinito: 1 in modalità YARN o tutti i core disponibili sul worker in modalità standalone).