Apache Spark - Bereitstellung

Die Spark-Anwendung, die spark-submit verwendet, ist ein Shell-Befehl, mit dem die Spark-Anwendung in einem Cluster bereitgestellt wird. Es verwendet alle jeweiligen Cluster-Manager über eine einheitliche Schnittstelle. Daher müssen Sie Ihre Anwendung nicht für jede einzelne konfigurieren.

Beispiel

Nehmen wir das gleiche Beispiel für die Wortanzahl, das wir zuvor mit Shell-Befehlen verwendet haben. Hier betrachten wir das gleiche Beispiel wie eine Funkenanwendung.

Probeneingabe

Der folgende Text enthält die Eingabedaten und die genannte Datei lautet in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Schauen Sie sich das folgende Programm an -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Speichern Sie das obige Programm in einer Datei mit dem Namen SparkWordCount.scala und legen Sie es in einem benutzerdefinierten Verzeichnis mit dem Namen spark-application.

Note - Während wir die inputRDD in countRDD umwandeln, verwenden wir flatMap () zum Tokenisieren der Zeilen (aus der Textdatei) in Wörter, die map () -Methode zum Zählen der Worthäufigkeit und die reduByKey () -Methode zum Zählen jeder Wortwiederholung.

Führen Sie die folgenden Schritte aus, um diesen Antrag einzureichen. Führen Sie alle Schritte in der ausspark-application Verzeichnis über das Terminal.

Schritt 1: Laden Sie Spark Ja herunter

Für die Kompilierung ist eine Spark-Core-JAR erforderlich. Laden Sie daher spark-core_2.10-1.3.0.jar über den folgenden Link herunter: Spark- Core-JAR und verschieben Sie die JAR-Datei aus dem Download-Verzeichnis inspark-application Verzeichnis.

Schritt 2: Programm kompilieren

Kompilieren Sie das obige Programm mit dem unten angegebenen Befehl. Dieser Befehl sollte aus dem Spark-Anwendungsverzeichnis ausgeführt werden. Hier,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ist ein Hadoop-Support-Jar aus der Spark-Bibliothek.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Schritt 3: Erstellen Sie ein JAR

Erstellen Sie mit dem folgenden Befehl eine JAR-Datei der Spark-Anwendung. Hier,wordcount ist der Dateiname für die JAR-Datei.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Schritt 4: Funkenantrag einreichen

Senden Sie die Funkenanwendung mit dem folgenden Befehl:

spark-submit --class SparkWordCount --master local wordcount.jar

Wenn es erfolgreich ausgeführt wird, finden Sie die unten angegebene Ausgabe. DasOKDas Einlassen der folgenden Ausgabe dient der Benutzeridentifikation und das ist die letzte Zeile des Programms. Wenn Sie die folgende Ausgabe sorgfältig lesen, werden Sie verschiedene Dinge finden, wie -

  • Dienst 'sparkDriver' an Port 42954 wurde erfolgreich gestartet
  • MemoryStore wurde mit einer Kapazität von 267,3 MB gestartet
  • Startete SparkUI unter http://192.168.1.217:4040
  • JAR-Datei hinzugefügt: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile bei SparkPi.scala: 11) wurde in 0,566 s beendet
  • Die Spark-Web-Benutzeroberfläche wurde unter http://192.168.1.217:4040 gestoppt
  • MemoryStore gelöscht
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Schritt 5: Überprüfen der Ausgabe

Nach erfolgreicher Ausführung des Programms finden Sie das Verzeichnis mit dem Namen outfile im Spark-Anwendungsverzeichnis.

Die folgenden Befehle werden zum Öffnen und Überprüfen der Liste der Dateien im Verzeichnis outfile verwendet.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Die Befehle zum Einchecken der Ausgabe part-00000 Datei sind -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Die Befehle zum Überprüfen der Ausgabe in der Datei part-00001 lauten -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Lesen Sie den folgenden Abschnitt, um mehr über den Befehl 'spark-submit' zu erfahren.

Spark-Submit-Syntax

spark-submit [options] <app jar | python file> [app arguments]

Optionen

S.No. Möglichkeit Beschreibung
1 --Meister spark: // host: port, mesos: // host: port, garn oder lokal.
2 --deploy-mode Gibt an, ob das Treiberprogramm lokal ("Client") oder auf einem der Worker-Computer im Cluster ("Cluster") gestartet werden soll (Standard: Client).
3 --Klasse Die Hauptklasse Ihrer Anwendung (für Java / Scala-Apps).
4 --Name Ein Name Ihrer Anwendung.
5 - Gläser Durch Kommas getrennte Liste lokaler Jars, die in die Klassenpfade von Treiber und Executor aufgenommen werden sollen.
6 --Pakete Durch Kommas getrennte Liste der Maven-Koordinaten von Jars, die in den Klassenpfaden von Treiber und Executor enthalten sein sollen.
7 - Repositories Durch Kommas getrennte Liste zusätzlicher Remote-Repositorys zur Suche nach den mit --packages angegebenen Maven-Koordinaten.
8 --py-Dateien Durch Kommas getrennte Liste von .zip-, .egg- oder .py-Dateien, die auf dem PYTHON PATH für Python-Apps abgelegt werden sollen.
9 --Dateien Durch Kommas getrennte Liste der Dateien, die im Arbeitsverzeichnis jedes Executors abgelegt werden sollen.
10 --conf (prop = val) Beliebige Spark-Konfigurationseigenschaft.
11 --properties-Datei Pfad zu einer Datei, aus der zusätzliche Eigenschaften geladen werden sollen. Wenn nicht angegeben, wird nach Conf / Spark-Standardeinstellungen gesucht.
12 --Treiberspeicher Speicher für Treiber (zB 1000M, 2G) (Standard: 512M).
13 --driver-java-options Zusätzliche Java-Optionen, die an den Treiber übergeben werden sollen.
14 - Treiber-Bibliothek-Pfad Zusätzliche Bibliothekspfadeinträge, die an den Treiber übergeben werden sollen.
15 --driver-class-path

Zusätzliche Klassenpfadeinträge, die an den Treiber übergeben werden sollen.

Beachten Sie, dass mit --jars hinzugefügte Jars automatisch in den Klassenpfad aufgenommen werden.

16 --executor-memory Speicher pro Executor (z. B. 1000M, 2G) (Standard: 1G).
17 --proxy-user Benutzer, der sich beim Einreichen des Antrags ausgibt.
18 - Hilfe, -h Diese Hilfemeldung anzeigen und beenden.
19 --verbose, -v Zusätzliche Debug-Ausgabe drucken.
20 --Ausführung Drucken Sie die Version des aktuellen Spark.
21 - Treiberkerne NUM Kerne für den Treiber (Standard: 1).
22 --überwachen Wenn angegeben, wird der Treiber bei einem Fehler neu gestartet.
23 --töten Wenn angegeben, wird der angegebene Treiber beendet.
24 --Status Wenn angegeben, wird der Status des angegebenen Treibers angefordert.
25 --total-executor-cores Gesamtkerne für alle Ausführenden.
26 --Ausführerkerne Anzahl der Kerne pro Executor. (Standard: 1 im YARN-Modus oder alle verfügbaren Kerne des Workers im Standalone-Modus).