Apache Spark - Distribuzione
L'applicazione Spark, utilizzando spark-submit, è un comando shell usato per distribuire l'applicazione Spark in un cluster. Utilizza tutti i rispettivi gestori di cluster tramite un'interfaccia uniforme. Pertanto, non è necessario configurare l'applicazione per ciascuna di esse.
Esempio
Prendiamo lo stesso esempio di conteggio delle parole che abbiamo usato prima, usando i comandi della shell. Qui, consideriamo lo stesso esempio di un'applicazione Spark.
Input di esempio
Il testo seguente è i dati di input e il file denominato è in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Guarda il seguente programma -
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Salva il programma sopra in un file denominato SparkWordCount.scala e posizionarlo in una directory definita dall'utente denominata spark-application.
Note - Durante la trasformazione di inputRDD in countRDD, stiamo usando flatMap () per tokenizzare le righe (da file di testo) in parole, il metodo map () per contare la frequenza delle parole e il metodo reduceByKey () per contare ogni ripetizione di parole.
Utilizzare i seguenti passaggi per inviare questa domanda. Esegui tutti i passaggi inspark-application directory tramite il terminale.
Passaggio 1: scarica Spark Ja
Spark core jar è richiesto per la compilazione, quindi scarica spark-core_2.10-1.3.0.jar dal seguente link Spark core jar e sposta il file jar dalla directory di download aspark-application directory.
Passaggio 2: compilare il programma
Compilare il programma precedente utilizzando il comando fornito di seguito. Questo comando dovrebbe essere eseguito dalla directory spark-application. Qui,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar è un jar di supporto Hadoop tratto dalla libreria Spark.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Passaggio 3: creare un JAR
Crea un file jar dell'applicazione Spark utilizzando il seguente comando. Qui,wordcount è il nome del file per il file jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Passaggio 4: inviare l'applicazione Spark
Invia l'applicazione Spark utilizzando il seguente comando:
spark-submit --class SparkWordCount --master local wordcount.jar
Se viene eseguito correttamente, troverai l'output fornito di seguito. IlOKlasciare che il seguente output sia per l'identificazione dell'utente e questa è l'ultima riga del programma. Se leggi attentamente il seguente output, troverai cose diverse, come:
- avviato con successo il servizio "sparkDriver" sulla porta 42954
- MemoryStore è stato avviato con una capacità di 267,3 MB
- SparkUI avviato su http://192.168.1.217:4040
- File JAR aggiunto: /home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile at SparkPi.scala: 11) terminato in 0,566 s
- Interfaccia utente Web di Spark interrotta su http://192.168.1.217:4040
- MemoryStore cancellato
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Passaggio 5: controllo dell'output
Dopo aver eseguito con successo il programma, troverai la directory denominata outfile nella directory spark-application.
I seguenti comandi vengono utilizzati per aprire e controllare l'elenco dei file nella directory outfile.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
I comandi per il check-in dell'output part-00000 file sono -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
I comandi per il controllo dell'output nel file part-00001 sono:
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Passare alla sezione seguente per saperne di più sul comando "spark-submit".
Sintassi Spark-submit
spark-submit [options] <app jar | python file> [app arguments]
Opzioni
S.No | Opzione | Descrizione |
---|---|---|
1 | --maestro | spark: // host: port, mesos: // host: port, filato o local. |
2 | - modalità di distribuzione | Indica se avviare il programma driver localmente ("client") o su una delle macchine worker all'interno del cluster ("cluster") (impostazione predefinita: client). |
3 | --classe | La classe principale della tua applicazione (per app Java / Scala). |
4 | --nome | Un nome della tua applicazione. |
5 | --barattoli | Elenco separato da virgole di file jar locali da includere nei percorsi classe del driver e dell'esecutore. |
6 | --pacchi | Elenco separato da virgole di coordinate Maven di jar da includere nei percorsi classi del driver e dell'esecutore. |
7 | - repository | Elenco separato da virgole di repository remoti aggiuntivi per cercare le coordinate Maven fornite con --packages. |
8 | --py-files | Elenco separato da virgole di file .zip, .egg o .py da inserire nel PERCORSO PYTHON per le app Python. |
9 | --File | Elenco di file separato da virgole da inserire nella directory di lavoro di ciascun esecutore. |
10 | --conf (prop = val) | Proprietà di configurazione Spark arbitraria. |
11 | --properties-file | Percorso di un file da cui caricare proprietà aggiuntive. Se non specificato, cercherà conf / spark-defaults. |
12 | --driver-memory | Memoria per il driver (es. 1000M, 2G) (impostazione predefinita: 512M). |
13 | --driver-java-opzioni | Opzioni Java aggiuntive da passare al driver. |
14 | --driver-library-path | Voci del percorso della libreria extra da passare al driver. |
15 | --driver-class-path | Voci del percorso di classe extra da passare al driver. Nota che i jar aggiunti con --jars vengono automaticamente inclusi nel classpath. |
16 | --executor-memory | Memoria per esecutore (es. 1000M, 2G) (impostazione predefinita: 1G). |
17 | --proxy-user | Utente da impersonare durante l'invio della domanda. |
18 | --help, -h | Mostra questo messaggio di aiuto ed esci. |
19 | --verbose, -v | Stampa l'output di debug aggiuntivo. |
20 | --versione | Stampa la versione dell'attuale Spark. |
21 | --driver-core NUM | Core per il driver (predefinito: 1). |
22 | --sorvegliare | Se fornito, riavvia il driver in caso di errore. |
23 | --uccidere | Se dato, uccide il conducente specificato. |
24 | --stato | Se fornito, richiede lo stato del driver specificato. |
25 | --total-executor-core | Core totali per tutti gli esecutori. |
26 | --executor-core | Numero di core per esecutore. (Predefinito: 1 in modalità YARN o tutti i core disponibili sul worker in modalità standalone). |