Apache Spark - Déploiement

L'application Spark, à l'aide de spark-submit, est une commande shell utilisée pour déployer l'application Spark sur un cluster. Il utilise tous les gestionnaires de cluster respectifs via une interface uniforme. Par conséquent, vous n'avez pas à configurer votre application pour chacun d'eux.

Exemple

Prenons le même exemple de décompte de mots que nous avons utilisé auparavant, en utilisant des commandes shell. Ici, nous considérons le même exemple qu'une application Spark.

Exemple d'entrée

Le texte suivant est les données d'entrée et le fichier nommé est in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Regardez le programme suivant -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Enregistrez le programme ci-dessus dans un fichier nommé SparkWordCount.scala et placez-le dans un répertoire défini par l'utilisateur nommé spark-application.

Note - Lors de la transformation de inputRDD en countRDD, nous utilisons flatMap () pour tokeniser les lignes (du fichier texte) en mots, la méthode map () pour compter la fréquence des mots et la méthode reductionByKey () pour compter chaque répétition de mot.

Suivez les étapes suivantes pour soumettre cette demande. Exécutez toutes les étapes duspark-application répertoire via le terminal.

Étape 1: Téléchargez Spark Ja

Spark core jar est requis pour la compilation, par conséquent, téléchargez spark-core_2.10-1.3.0.jar à partir du lien suivant Spark core jar et déplacez le fichier jar du répertoire de téléchargement versspark-application annuaire.

Étape 2: Compilez le programme

Compilez le programme ci-dessus en utilisant la commande ci-dessous. Cette commande doit être exécutée à partir du répertoire spark-application. Ici,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar est un fichier jar de support Hadoop extrait de la bibliothèque Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Étape 3: créer un JAR

Créez un fichier jar de l'application Spark à l'aide de la commande suivante. Ici,wordcount est le nom de fichier du fichier jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Étape 4: Soumettez l'application Spark

Soumettez l'application Spark à l'aide de la commande suivante -

spark-submit --class SparkWordCount --master local wordcount.jar

S'il est exécuté avec succès, vous trouverez le résultat ci-dessous. leOKlaisser dans la sortie suivante est pour l'identification de l'utilisateur et c'est la dernière ligne du programme. Si vous lisez attentivement la sortie suivante, vous trouverez différentes choses, telles que -

  • a démarré avec succès le service 'sparkDriver' sur le port 42954
  • MemoryStore a démarré avec une capacité de 267,3 Mo
  • Démarrage de SparkUI à l'adresse http://192.168.1.217:4040
  • Fichier JAR ajouté: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile sur SparkPi.scala: 11) terminé en 0,566 s
  • Interface utilisateur Web Spark arrêtée à l'adresse http://192.168.1.217:4040
  • MemoryStore effacé
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Étape 5: Vérification de la sortie

Après une exécution réussie du programme, vous trouverez le répertoire nommé outfile dans le répertoire spark-application.

Les commandes suivantes sont utilisées pour ouvrir et vérifier la liste des fichiers dans le répertoire outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Les commandes de vérification de la sortie dans part-00000 fichier sont -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Les commandes pour vérifier la sortie dans le fichier part-00001 sont -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Parcourez la section suivante pour en savoir plus sur la commande 'spark-submit'.

Syntaxe de Spark-submit

spark-submit [options] <app jar | python file> [app arguments]

Options

S. Non Option La description
1 --Maître spark: // hôte: port, mesos: // hôte: port, fil ou local.
2 --deploy-mode S'il faut lancer le programme pilote localement ("client") ou sur l'une des machines de travail à l'intérieur du cluster ("cluster") (par défaut: client).
3 --classe Classe principale de votre application (pour les applications Java / Scala).
4 --Nom Un nom de votre application.
5 - bocaux Liste des fichiers JAR locaux séparés par des virgules à inclure dans les chemins de classe du pilote et de l'exécuteur.
6 --paquets Liste séparée par des virgules des coordonnées maven des fichiers JAR à inclure dans les chemins de classe du pilote et de l'exécuteur.
sept - référentiels Liste séparée par des virgules de référentiels distants supplémentaires pour rechercher les coordonnées maven données avec --packages.
8 --py-fichiers Liste de fichiers .zip, .egg ou .py séparés par des virgules à placer sur le PATH PYTHON pour les applications Python.
9 --des dossiers Liste de fichiers séparés par des virgules à placer dans le répertoire de travail de chaque exécuteur.
dix --conf (prop = val) Propriété de configuration Spark arbitraire.
11 --properties-fichier Chemin vers un fichier à partir duquel charger des propriétés supplémentaires. S'il n'est pas spécifié, cela recherchera les valeurs par défaut de conf / spark.
12 --mémoire-pilote Mémoire pour le pilote (par exemple 1000M, 2G) (par défaut: 512M).
13 --driver-java-options Options Java supplémentaires à transmettre au pilote.
14 --driver-library-path Entrées de chemin de bibliothèque supplémentaires à transmettre au pilote.
15 - chemin-classe-pilote

Entrées de chemin de classe supplémentaires à transmettre au pilote.

Notez que les fichiers jars ajoutés avec --jars sont automatiquement inclus dans le chemin de classe.

16 - mémoire-exécuteur Mémoire par exécuteur (par exemple 1000M, 2G) (par défaut: 1G).
17 --proxy-utilisateur Utilisateur à emprunter l'identité lors de la soumission de la candidature.
18 --help, -h Affichez ce message d'aide et quittez.
19 --verbose, -v Imprimer une sortie de débogage supplémentaire.
20 --version Imprimez la version actuelle de Spark.
21 --driver-cœurs NUM Cœurs pour le pilote (par défaut: 1).
22 --superviser Le cas échéant, redémarre le pilote en cas d'échec.
23 --tuer S'il est donné, tue le pilote spécifié.
24 --statut S'il est donné, demande le statut du pilote spécifié.
25 --total-executor-cores Nombre total de cœurs pour tous les exécuteurs.
26 --executor-cores Nombre de cœurs par exécuteur. (Par défaut: 1 en mode YARN, ou tous les cœurs disponibles sur le worker en mode autonome).