Apache Spark - Bereitstellung
Die Spark-Anwendung, die spark-submit verwendet, ist ein Shell-Befehl, mit dem die Spark-Anwendung in einem Cluster bereitgestellt wird. Es verwendet alle jeweiligen Cluster-Manager über eine einheitliche Schnittstelle. Daher müssen Sie Ihre Anwendung nicht für jede einzelne konfigurieren.
Beispiel
Nehmen wir das gleiche Beispiel für die Wortanzahl, das wir zuvor mit Shell-Befehlen verwendet haben. Hier betrachten wir das gleiche Beispiel wie eine Funkenanwendung.
Probeneingabe
Der folgende Text enthält die Eingabedaten und die genannte Datei lautet in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Schauen Sie sich das folgende Programm an -
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Speichern Sie das obige Programm in einer Datei mit dem Namen SparkWordCount.scala und legen Sie es in einem benutzerdefinierten Verzeichnis mit dem Namen spark-application.
Note - Während wir die inputRDD in countRDD umwandeln, verwenden wir flatMap () zum Tokenisieren der Zeilen (aus der Textdatei) in Wörter, die map () -Methode zum Zählen der Worthäufigkeit und die reduByKey () -Methode zum Zählen jeder Wortwiederholung.
Führen Sie die folgenden Schritte aus, um diesen Antrag einzureichen. Führen Sie alle Schritte in der ausspark-application Verzeichnis über das Terminal.
Schritt 1: Laden Sie Spark Ja herunter
Für die Kompilierung ist eine Spark-Core-JAR erforderlich. Laden Sie daher spark-core_2.10-1.3.0.jar über den folgenden Link herunter: Spark- Core-JAR und verschieben Sie die JAR-Datei aus dem Download-Verzeichnis inspark-application Verzeichnis.
Schritt 2: Programm kompilieren
Kompilieren Sie das obige Programm mit dem unten angegebenen Befehl. Dieser Befehl sollte aus dem Spark-Anwendungsverzeichnis ausgeführt werden. Hier,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ist ein Hadoop-Support-Jar aus der Spark-Bibliothek.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Schritt 3: Erstellen Sie ein JAR
Erstellen Sie mit dem folgenden Befehl eine JAR-Datei der Spark-Anwendung. Hier,wordcount ist der Dateiname für die JAR-Datei.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Schritt 4: Funkenantrag einreichen
Senden Sie die Funkenanwendung mit dem folgenden Befehl:
spark-submit --class SparkWordCount --master local wordcount.jar
Wenn es erfolgreich ausgeführt wird, finden Sie die unten angegebene Ausgabe. DasOKDas Einlassen der folgenden Ausgabe dient der Benutzeridentifikation und das ist die letzte Zeile des Programms. Wenn Sie die folgende Ausgabe sorgfältig lesen, werden Sie verschiedene Dinge finden, wie -
- Dienst 'sparkDriver' an Port 42954 wurde erfolgreich gestartet
- MemoryStore wurde mit einer Kapazität von 267,3 MB gestartet
- Startete SparkUI unter http://192.168.1.217:4040
- JAR-Datei hinzugefügt: /home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile bei SparkPi.scala: 11) wurde in 0,566 s beendet
- Die Spark-Web-Benutzeroberfläche wurde unter http://192.168.1.217:4040 gestoppt
- MemoryStore gelöscht
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Schritt 5: Überprüfen der Ausgabe
Nach erfolgreicher Ausführung des Programms finden Sie das Verzeichnis mit dem Namen outfile im Spark-Anwendungsverzeichnis.
Die folgenden Befehle werden zum Öffnen und Überprüfen der Liste der Dateien im Verzeichnis outfile verwendet.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
Die Befehle zum Einchecken der Ausgabe part-00000 Datei sind -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Die Befehle zum Überprüfen der Ausgabe in der Datei part-00001 lauten -
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Lesen Sie den folgenden Abschnitt, um mehr über den Befehl 'spark-submit' zu erfahren.
Spark-Submit-Syntax
spark-submit [options] <app jar | python file> [app arguments]
Optionen
S.No. | Möglichkeit | Beschreibung |
---|---|---|
1 | --Meister | spark: // host: port, mesos: // host: port, garn oder lokal. |
2 | --deploy-mode | Gibt an, ob das Treiberprogramm lokal ("Client") oder auf einem der Worker-Computer im Cluster ("Cluster") gestartet werden soll (Standard: Client). |
3 | --Klasse | Die Hauptklasse Ihrer Anwendung (für Java / Scala-Apps). |
4 | --Name | Ein Name Ihrer Anwendung. |
5 | - Gläser | Durch Kommas getrennte Liste lokaler Jars, die in die Klassenpfade von Treiber und Executor aufgenommen werden sollen. |
6 | --Pakete | Durch Kommas getrennte Liste der Maven-Koordinaten von Jars, die in den Klassenpfaden von Treiber und Executor enthalten sein sollen. |
7 | - Repositories | Durch Kommas getrennte Liste zusätzlicher Remote-Repositorys zur Suche nach den mit --packages angegebenen Maven-Koordinaten. |
8 | --py-Dateien | Durch Kommas getrennte Liste von .zip-, .egg- oder .py-Dateien, die auf dem PYTHON PATH für Python-Apps abgelegt werden sollen. |
9 | --Dateien | Durch Kommas getrennte Liste der Dateien, die im Arbeitsverzeichnis jedes Executors abgelegt werden sollen. |
10 | --conf (prop = val) | Beliebige Spark-Konfigurationseigenschaft. |
11 | --properties-Datei | Pfad zu einer Datei, aus der zusätzliche Eigenschaften geladen werden sollen. Wenn nicht angegeben, wird nach Conf / Spark-Standardeinstellungen gesucht. |
12 | --Treiberspeicher | Speicher für Treiber (zB 1000M, 2G) (Standard: 512M). |
13 | --driver-java-options | Zusätzliche Java-Optionen, die an den Treiber übergeben werden sollen. |
14 | - Treiber-Bibliothek-Pfad | Zusätzliche Bibliothekspfadeinträge, die an den Treiber übergeben werden sollen. |
15 | --driver-class-path | Zusätzliche Klassenpfadeinträge, die an den Treiber übergeben werden sollen. Beachten Sie, dass mit --jars hinzugefügte Jars automatisch in den Klassenpfad aufgenommen werden. |
16 | --executor-memory | Speicher pro Executor (z. B. 1000M, 2G) (Standard: 1G). |
17 | --proxy-user | Benutzer, der sich beim Einreichen des Antrags ausgibt. |
18 | - Hilfe, -h | Diese Hilfemeldung anzeigen und beenden. |
19 | --verbose, -v | Zusätzliche Debug-Ausgabe drucken. |
20 | --Ausführung | Drucken Sie die Version des aktuellen Spark. |
21 | - Treiberkerne NUM | Kerne für den Treiber (Standard: 1). |
22 | --überwachen | Wenn angegeben, wird der Treiber bei einem Fehler neu gestartet. |
23 | --töten | Wenn angegeben, wird der angegebene Treiber beendet. |
24 | --Status | Wenn angegeben, wird der Status des angegebenen Treibers angefordert. |
25 | --total-executor-cores | Gesamtkerne für alle Ausführenden. |
26 | --Ausführerkerne | Anzahl der Kerne pro Executor. (Standard: 1 im YARN-Modus oder alle verfügbaren Kerne des Workers im Standalone-Modus). |