Apache Spark - Guide rapide
Les industries utilisent largement Hadoop pour analyser leurs ensembles de données. La raison en est que le framework Hadoop est basé sur un modèle de programmation simple (MapReduce) et qu'il permet une solution informatique évolutive, flexible, tolérante aux pannes et rentable. Ici, la principale préoccupation est de maintenir la vitesse de traitement des grands ensembles de données en termes de temps d'attente entre les requêtes et de temps d'attente pour exécuter le programme.
Spark a été introduit par Apache Software Foundation pour accélérer le processus du logiciel informatique Hadoop.
Contrairement à une croyance commune, Spark is not a modified version of Hadoopet ne dépend pas vraiment de Hadoop car il dispose de sa propre gestion de cluster. Hadoop n'est que l'un des moyens d'implémenter Spark.
Spark utilise Hadoop de deux manières - l'une est storage et le second est processing. Comme Spark dispose de son propre calcul de gestion de cluster, il utilise Hadoop uniquement à des fins de stockage.
Apache Spark
Apache Spark est une technologie de calcul en cluster ultra-rapide, conçue pour des calculs rapides. Il est basé sur Hadoop MapReduce et étend le modèle MapReduce pour l'utiliser efficacement pour plus de types de calculs, ce qui inclut les requêtes interactives et le traitement de flux. La principale caractéristique de Spark est sain-memory cluster computing qui augmente la vitesse de traitement d'une application.
Spark est conçu pour couvrir un large éventail de charges de travail telles que les applications par lots, les algorithmes itératifs, les requêtes interactives et le streaming. Outre la prise en charge de toutes ces charges de travail dans un système respectif, cela réduit la charge de gestion liée à la maintenance d'outils séparés.
Évolution d'Apache Spark
Spark est l'un des sous-projets d'Hadoop développé en 2009 dans AMPLab d'UC Berkeley par Matei Zaharia. Il a été Open Sourced en 2010 sous une licence BSD. Il a été donné à la fondation logicielle Apache en 2013, et maintenant Apache Spark est devenu un projet Apache de haut niveau à partir de février 2014.
Caractéristiques d'Apache Spark
Apache Spark possède les fonctionnalités suivantes.
Speed- Spark permet d'exécuter une application dans un cluster Hadoop, jusqu'à 100 fois plus rapide en mémoire et 10 fois plus rapide lors de l'exécution sur disque. Ceci est possible en réduisant le nombre d'opérations de lecture / écriture sur le disque. Il stocke les données de traitement intermédiaires en mémoire.
Supports multiple languages- Spark fournit des API intégrées en Java, Scala ou Python. Par conséquent, vous pouvez écrire des applications dans différentes langues. Spark propose 80 opérateurs de haut niveau pour les requêtes interactives.
Advanced Analytics- Spark ne prend pas seulement en charge «Map» et «réduire». Il prend également en charge les requêtes SQL, les données de streaming, l'apprentissage automatique (ML) et les algorithmes de graph.
Spark construit sur Hadoop
Le diagramme suivant montre trois façons de créer Spark avec des composants Hadoop.
Il existe trois façons de déployer Spark, comme expliqué ci-dessous.
Standalone- Le déploiement de Spark Standalone signifie que Spark occupe la place au-dessus de HDFS (Hadoop Distributed File System) et de l'espace est alloué pour HDFS, explicitement. Ici, Spark et MapReduce s'exécuteront côte à côte pour couvrir toutes les tâches Spark sur le cluster.
Hadoop Yarn- Le déploiement de Hadoop Yarn signifie, simplement, que Spark fonctionne sur Yarn sans aucune pré-installation ou accès root requis. Il aide à intégrer Spark dans l'écosystème Hadoop ou la pile Hadoop. Il permet à d'autres composants de fonctionner au-dessus de la pile.
Spark in MapReduce (SIMR)- Spark dans MapReduce est utilisé pour lancer le travail Spark en plus du déploiement autonome. Avec SIMR, l'utilisateur peut démarrer Spark et utiliser son shell sans aucun accès administratif.
Composants de Spark
L'illustration suivante décrit les différents composants de Spark.
Apache Spark Core
Spark Core est le moteur d'exécution générale sous-jacent de la plate-forme Spark sur lequel toutes les autres fonctionnalités sont basées. Il fournit un calcul en mémoire et des ensembles de données de référence dans des systèmes de stockage externes.
Spark SQL
Spark SQL est un composant au-dessus de Spark Core qui introduit une nouvelle abstraction de données appelée SchemaRDD, qui prend en charge les données structurées et semi-structurées.
Spark Streaming
Spark Streaming exploite la capacité de planification rapide de Spark Core pour effectuer des analyses de streaming. Il ingère les données en mini-lots et effectue des transformations RDD (Resilient Distributed Datasets) sur ces mini-lots de données.
MLlib (bibliothèque d'apprentissage automatique)
MLlib est un framework d'apprentissage automatique distribué au-dessus de Spark en raison de l'architecture Spark basée sur la mémoire distribuée. C'est, selon les benchmarks, fait par les développeurs MLlib contre les implémentations ALS (Alternating Meast Squares). Spark MLlib est neuf fois plus rapide que la version sur disque Hadoop deApache Mahout (avant que Mahout n'obtienne une interface Spark).
GraphX
GraphX est un framework de traitement de graphes distribué au-dessus de Spark. Il fournit une API pour exprimer le calcul de graphes qui peut modéliser les graphes définis par l'utilisateur à l'aide de l'API d'abstraction Pregel. Il fournit également un runtime optimisé pour cette abstraction.
Ensembles de données distribués résilients
Les ensembles de données distribués résilients (RDD) sont une structure de données fondamentale de Spark. C'est une collection d'objets distribués immuable. Chaque ensemble de données dans RDD est divisé en partitions logiques, qui peuvent être calculées sur différents nœuds du cluster. Les RDD peuvent contenir tout type d'objets Python, Java ou Scala, y compris des classes définies par l'utilisateur.
Formellement, un RDD est une collection d'enregistrements partitionnés en lecture seule. Les RDD peuvent être créés par des opérations déterministes sur des données sur un stockage stable ou sur d'autres RDD. RDD est un ensemble d'éléments tolérants aux pannes pouvant être exploités en parallèle.
Il existe deux façons de créer des RDD - parallelizing une collection existante dans votre programme de pilote, ou referencing a dataset dans un système de stockage externe, tel qu'un système de fichiers partagé, HDFS, HBase ou toute source de données offrant un format d'entrée Hadoop.
Spark utilise le concept de RDD pour réaliser des opérations MapReduce plus rapides et efficaces. Voyons d'abord comment les opérations MapReduce se déroulent et pourquoi elles ne sont pas aussi efficaces.
Le partage de données est lent dans MapReduce
MapReduce est largement adopté pour traiter et générer de grands ensembles de données avec un algorithme parallèle et distribué sur un cluster. Il permet aux utilisateurs d'écrire des calculs parallèles, à l'aide d'un ensemble d'opérateurs de haut niveau, sans avoir à se soucier de la répartition du travail et de la tolérance aux pannes.
Malheureusement, dans la plupart des frameworks actuels, le seul moyen de réutiliser les données entre les calculs (Ex - entre deux travaux MapReduce) est de les écrire sur un système de stockage externe stable (Ex - HDFS). Bien que ce framework fournisse de nombreuses abstractions pour accéder aux ressources de calcul d'un cluster, les utilisateurs en veulent toujours plus.
Tous les deux Iterative et Interactiveles applications nécessitent un partage de données plus rapide entre les travaux parallèles. Le partage des données est lent dans MapReduce en raison dereplication, serialization, et disk IO. En ce qui concerne le système de stockage, la plupart des applications Hadoop passent plus de 90% du temps à effectuer des opérations de lecture-écriture HDFS.
Opérations itératives sur MapReduce
Réutilisez les résultats intermédiaires sur plusieurs calculs dans des applications en plusieurs étapes. L'illustration suivante explique le fonctionnement de l'infrastructure actuelle, tout en effectuant les opérations itératives sur MapReduce. Cela entraîne des frais généraux importants dus à la réplication des données, aux E / S de disque et à la sérialisation, ce qui ralentit le système.
Opérations interactives sur MapReduce
L'utilisateur exécute des requêtes ad hoc sur le même sous-ensemble de données. Chaque requête effectuera les E / S disque sur le stockage stable, ce qui peut dominer le temps d'exécution de l'application.
L'illustration suivante explique le fonctionnement de l'infrastructure actuelle lors des requêtes interactives sur MapReduce.
Partage de données à l'aide de Spark RDD
Le partage des données est lent dans MapReduce en raison de replication, serialization, et disk IO. La plupart des applications Hadoop passent plus de 90% du temps à effectuer des opérations de lecture-écriture HDFS.
Reconnaissant ce problème, les chercheurs ont développé un cadre spécialisé appelé Apache Spark. L'idée clé de l'étincelle estRinsilient Distribué Datasets (RDD); il prend en charge le calcul de traitement en mémoire. Cela signifie qu'il stocke l'état de la mémoire en tant qu'objet dans les tâches et que l'objet peut être partagé entre ces tâches. Le partage de données en mémoire est 10 à 100 fois plus rapide que le réseau et le disque.
Essayons maintenant de découvrir comment les opérations itératives et interactives se déroulent dans Spark RDD.
Opérations itératives sur Spark RDD
L'illustration ci-dessous montre les opérations itératives sur Spark RDD. Il stockera les résultats intermédiaires dans une mémoire distribuée au lieu d'un stockage stable (disque) et rendra le système plus rapide.
Note - Si la mémoire distribuée (RAM) est suffisante pour stocker les résultats intermédiaires (état du JOB), elle stockera ces résultats sur le disque.
Opérations interactives sur Spark RDD
Cette illustration montre les opérations interactives sur Spark RDD. Si différentes requêtes sont exécutées à plusieurs reprises sur le même ensemble de données, ces données particulières peuvent être conservées en mémoire pour de meilleurs temps d'exécution.
Par défaut, chaque RDD transformé peut être recalculé chaque fois que vous exécutez une action dessus. Cependant, vous pouvez égalementpersistun RDD en mémoire, auquel cas Spark conservera les éléments sur le cluster pour un accès beaucoup plus rapide, la prochaine fois que vous l'interrogerez. Il existe également une prise en charge des RDD persistants sur le disque ou répliqués sur plusieurs nœuds.
Spark est le sous-projet de Hadoop. Par conséquent, il est préférable d'installer Spark dans un système basé sur Linux. Les étapes suivantes montrent comment installer Apache Spark.
Étape 1: vérification de l'installation de Java
L'installation de Java est l'une des choses obligatoires lors de l'installation de Spark. Essayez la commande suivante pour vérifier la version JAVA.
$java -version
Si Java est déjà installé sur votre système, vous obtenez la réponse suivante -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
Si Java n'est pas installé sur votre système, installez Java avant de passer à l'étape suivante.
Étape 2: vérification de l'installation de Scala
Vous devez utiliser le langage Scala pour implémenter Spark. Alors vérifions l'installation de Scala en utilisant la commande suivante.
$scala -version
Si Scala est déjà installé sur votre système, vous obtenez la réponse suivante -
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
Si Scala n'est pas installé sur votre système, passez à l'étape suivante pour l'installation de Scala.
Étape 3: Téléchargement de Scala
Téléchargez la dernière version de Scala en visitant le lien suivant Télécharger Scala . Pour ce tutoriel, nous utilisons la version scala-2.11.6. Après le téléchargement, vous trouverez le fichier tar Scala dans le dossier de téléchargement.
Étape 4: Installation de Scala
Suivez les étapes ci-dessous pour installer Scala.
Extrayez le fichier tar Scala
Tapez la commande suivante pour extraire le fichier tar Scala.
$ tar xvf scala-2.11.6.tgz
Déplacer les fichiers du logiciel Scala
Utilisez les commandes suivantes pour déplacer les fichiers du logiciel Scala vers le répertoire respectif (/usr/local/scala).
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit
Définir PATH pour Scala
Utilisez la commande suivante pour définir PATH pour Scala.
$ export PATH = $PATH:/usr/local/scala/bin
Vérification de l'installation de Scala
Après l'installation, il est préférable de le vérifier. Utilisez la commande suivante pour vérifier l'installation de Scala.
$scala -version
Si Scala est déjà installé sur votre système, vous obtenez la réponse suivante -
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
Étape 5: Téléchargement d'Apache Spark
Téléchargez la dernière version de Spark en visitant le lien suivant Télécharger Spark . Pour ce tutoriel, nous utilisonsspark-1.3.1-bin-hadoop2.6version. Après l'avoir téléchargé, vous trouverez le fichier tar Spark dans le dossier de téléchargement.
Étape 6: Installation de Spark
Suivez les étapes ci-dessous pour installer Spark.
Extraction du goudron d'étincelle
La commande suivante pour extraire le fichier tar spark.
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
Déplacement des fichiers logiciels Spark
Les commandes suivantes pour déplacer les fichiers du logiciel Spark vers le répertoire respectif (/usr/local/spark).
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit
Configuration de l'environnement pour Spark
Ajoutez la ligne suivante à ~/.bashrcfichier. Cela signifie ajouter l'emplacement, où se trouve le fichier du logiciel Spark à la variable PATH.
export PATH=$PATH:/usr/local/spark/bin
Utilisez la commande suivante pour rechercher le fichier ~ / .bashrc.
$ source ~/.bashrc
Étape 7: vérification de l'installation de Spark
Écrivez la commande suivante pour ouvrir le shell Spark.
$spark-shell
Si Spark est installé avec succès, vous trouverez la sortie suivante.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Spark Core est la base de l'ensemble du projet. Il fournit des fonctionnalités réparties de répartition des tâches, de planification et d'E / S de base. Spark utilise une structure de données fondamentale spécialisée appelée RDD (Resilient Distributed Datasets) qui est une collection logique de données partitionnées sur des machines. Les RDD peuvent être créés de deux manières; la première consiste à référencer des ensembles de données dans des systèmes de stockage externes et la deuxième consiste à appliquer des transformations (par exemple, mappage, filtre, réducteur, jointure) sur les RDD existants.
L'abstraction RDD est exposée via une API intégrée au langage. Cela simplifie la programmation car la façon dont les applications manipulent les RDD est similaire à la manipulation de collections locales de données.
Shell Spark
Spark fournit un shell interactif - un outil puissant pour analyser les données de manière interactive. Il est disponible en langage Scala ou Python. L'abstraction principale de Spark est une collection distribuée d'éléments appelée jeu de données distribué résilient (RDD). Les RDD peuvent être créés à partir de formats d'entrée Hadoop (tels que des fichiers HDFS) ou en transformant d'autres RDD.
Ouvrez Spark Shell
La commande suivante est utilisée pour ouvrir le shell Spark.
$ spark-shell
Créer un RDD simple
Créons un RDD simple à partir du fichier texte. Utilisez la commande suivante pour créer un RDD simple.
scala> val inputfile = sc.textFile(“input.txt”)
La sortie de la commande ci-dessus est
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
L'API Spark RDD en introduit quelques Transformations et peu Actions pour manipuler RDD.
Transformations RDD
Les transformations RDD renvoient un pointeur vers le nouveau RDD et vous permettent de créer des dépendances entre les RDD. Chaque RDD dans la chaîne de dépendances (String of Dependencies) a une fonction pour calculer ses données et a un pointeur (dépendance) vers son RDD parent.
Spark est paresseux, donc rien ne sera exécuté à moins que vous n'appeliez une transformation ou une action qui déclenchera la création et l'exécution de la tâche. Regardez l'extrait suivant de l'exemple de comptage de mots.
Par conséquent, la transformation RDD n'est pas un ensemble de données mais est une étape d'un programme (peut-être la seule étape) indiquant à Spark comment obtenir des données et quoi en faire.
S.Non | Transformations et signification |
---|---|
1 | map(func) Renvoie un nouvel ensemble de données distribué, formé en passant chaque élément de la source via une fonction func. |
2 | filter(func) Renvoie un nouvel ensemble de données formé en sélectionnant les éléments de la source sur lesquels func renvoie vrai. |
3 | flatMap(func) Similaire à map, mais chaque élément d'entrée peut être mappé à 0 ou plusieurs éléments de sortie (donc func doit renvoyer un Seq plutôt qu'un seul élément). |
4 | mapPartitions(func) Similaire à map, mais s'exécute séparément sur chaque partition (bloc) du RDD, donc func doit être de type Iterator <T> ⇒ Iterator <U> lors de l'exécution sur un RDD de type T. |
5 | mapPartitionsWithIndex(func) Similaire à la carte des partitions, mais fournit également func avec une valeur entière représentant l'index de la partition, donc func doit être de type (Int, Iterator <T>) ⇒ Iterator <U> lors de l'exécution sur un RDD de type T. |
6 | sample(withReplacement, fraction, seed) Échantillon d'un fraction des données, avec ou sans remplacement, en utilisant une graine de générateur de nombres aléatoires donnée. |
sept | union(otherDataset) Renvoie un nouvel ensemble de données qui contient l'union des éléments de l'ensemble de données source et de l'argument. |
8 | intersection(otherDataset) Renvoie un nouveau RDD contenant l'intersection des éléments du jeu de données source et de l'argument. |
9 | distinct([numTasks]) Renvoie un nouvel ensemble de données qui contient les éléments distincts de l'ensemble de données source. |
dix | groupByKey([numTasks]) Lorsqu'il est appelé sur un ensemble de données de paires (K, V), renvoie un ensemble de données de paires (K, Iterable <V>). Note - Si vous groupez afin d'effectuer une agrégation (telle qu'une somme ou une moyenne) sur chaque clé, l'utilisation de reductionByKey ou aggregateByKey donnera de bien meilleures performances. |
11 | reduceByKey(func, [numTasks]) Lorsqu'il est appelé sur un ensemble de données de paires (K, V), renvoie un ensemble de données de paires (K, V) où les valeurs de chaque clé sont agrégées en utilisant la fonction de réduction donnée func , qui doit être de type (V, V) ⇒ V Comme dans groupByKey, le nombre de tâches de réduction est configurable via un deuxième argument facultatif. |
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Lorsqu'il est appelé sur un ensemble de données de paires (K, V), renvoie un ensemble de données de paires (K, U) où les valeurs de chaque clé sont agrégées à l'aide des fonctions de combinaison données et d'une valeur neutre "zéro". Permet un type de valeur agrégée différent du type de valeur d'entrée, tout en évitant les allocations inutiles. Comme dans groupByKey, le nombre de tâches de réduction est configurable via un deuxième argument facultatif. |
13 | sortByKey([ascending], [numTasks]) Lorsqu'il est appelé sur un ensemble de données de paires (K, V) où K implémente Ordered, renvoie un ensemble de données de paires (K, V) triées par clés dans l'ordre croissant ou décroissant, comme spécifié dans l'argument ascendant booléen. |
14 | join(otherDataset, [numTasks]) Lorsqu'il est appelé sur des ensembles de données de type (K, V) et (K, W), renvoie un ensemble de données de paires (K, (V, W)) avec toutes les paires d'éléments pour chaque clé. Les jointures externes sont prises en charge via leftOuterJoin, rightOuterJoin et fullOuterJoin. |
15 | cogroup(otherDataset, [numTasks]) Lorsqu'il est appelé sur des ensembles de données de type (K, V) et (K, W), renvoie un ensemble de données de tuples (K, (Iterable <V>, Iterable <W>)). Cette opération est également appelée groupe avec. |
16 | cartesian(otherDataset) Lorsqu'il est appelé sur des ensembles de données de types T et U, renvoie un ensemble de données de paires (T, U) (toutes les paires d'éléments). |
17 | pipe(command, [envVars]) Dirigez chaque partition du RDD via une commande shell, par exemple un script Perl ou bash. Les éléments RDD sont écrits dans le stdin du processus et les lignes sorties vers son stdout sont renvoyées sous la forme d'un RDD de chaînes. |
18 | coalesce(numPartitions) Diminuez le nombre de partitions dans le RDD à numPartitions. Utile pour exécuter des opérations plus efficacement après avoir filtré un grand ensemble de données. |
19 | repartition(numPartitions) Remaniez les données du RDD de manière aléatoire pour créer plus ou moins de partitions et les équilibrer entre elles. Cela mélange toujours toutes les données sur le réseau. |
20 | repartitionAndSortWithinPartitions(partitioner) Repartitionnez le RDD selon le partitionneur donné et, dans chaque partition résultante, triez les enregistrements par leurs clés. C'est plus efficace que d'appeler la répartition puis de trier dans chaque partition, car cela peut pousser le tri vers le bas dans la machine de mélange. |
Actions
S.Non | Action et signification |
---|---|
1 | reduce(func) Agréger les éléments de l'ensemble de données à l'aide d'une fonction func(qui prend deux arguments et en renvoie un). La fonction doit être commutative et associative pour pouvoir être calculée correctement en parallèle. |
2 | collect() Renvoie tous les éléments de l'ensemble de données sous forme de tableau dans le programme pilote. Cela est généralement utile après un filtre ou une autre opération qui renvoie un sous-ensemble suffisamment petit des données. |
3 | count() Renvoie le nombre d'éléments dans l'ensemble de données. |
4 | first() Renvoie le premier élément de l'ensemble de données (similaire à take (1)). |
5 | take(n) Renvoie un tableau avec le premier n éléments de l'ensemble de données. |
6 | takeSample (withReplacement,num, [seed]) Renvoie un tableau avec un échantillon aléatoire de num éléments de l'ensemble de données, avec ou sans remplacement, spécifiant éventuellement une graine de générateur de nombres aléatoires. |
sept | takeOrdered(n, [ordering]) Renvoie le premier n éléments du RDD en utilisant soit leur ordre naturel, soit un comparateur personnalisé. |
8 | saveAsTextFile(path) Écrit les éléments de l'ensemble de données sous forme de fichier texte (ou ensemble de fichiers texte) dans un répertoire donné du système de fichiers local, HDFS ou tout autre système de fichiers pris en charge par Hadoop. Spark appelle toString sur chaque élément pour le convertir en une ligne de texte dans le fichier. |
9 | saveAsSequenceFile(path) (Java and Scala) Écrit les éléments de l'ensemble de données en tant que fichier de séquence Hadoop dans un chemin donné dans le système de fichiers local, HDFS ou tout autre système de fichiers pris en charge par Hadoop. Ceci est disponible sur les RDD de paires clé-valeur qui implémentent l'interface Writable de Hadoop. Dans Scala, il est également disponible sur les types qui sont implicitement convertibles en Writable (Spark inclut des conversions pour les types de base comme Int, Double, String, etc.). |
dix | saveAsObjectFile(path) (Java and Scala) Écrit les éléments de l'ensemble de données dans un format simple à l'aide de la sérialisation Java, qui peut ensuite être chargée à l'aide de SparkContext.objectFile (). |
11 | countByKey() Uniquement disponible sur les RDD de type (K, V). Renvoie une table de hachage de paires (K, Int) avec le nombre de chaque clé. |
12 | foreach(func) Exécute une fonction funcsur chaque élément de l'ensemble de données. Ceci est généralement effectué pour des effets secondaires tels que la mise à jour d'un accumulateur ou l'interaction avec des systèmes de stockage externes. Note- la modification de variables autres que les accumulateurs en dehors de foreach () peut entraîner un comportement indéfini. Voir Comprendre les fermetures pour plus de détails. |
Programmation avec RDD
Voyons les implémentations de quelques transformations et actions RDD dans la programmation RDD à l'aide d'un exemple.
Exemple
Prenons un exemple de comptage de mots - Il compte chaque mot apparaissant dans un document. Considérez le texte suivant comme une entrée et est enregistré en tant queinput.txt fichier dans un répertoire personnel.
input.txt - fichier d'entrée.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Suivez la procédure ci-dessous pour exécuter l'exemple donné.
Ouvrez Spark-Shell
La commande suivante est utilisée pour ouvrir Spark Shell. Généralement, spark est construit à l'aide de Scala. Par conséquent, un programme Spark s'exécute sur l'environnement Scala.
$ spark-shell
Si le shell Spark s'ouvre avec succès, vous trouverez la sortie suivante. Regardez la dernière ligne de la sortie «Spark context available as sc» signifie que le conteneur Spark est automatiquement créé objet de contexte Spark avec le nomsc. Avant de démarrer la première étape d'un programme, l'objet SparkContext doit être créé.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Créer un RDD
Tout d'abord, nous devons lire le fichier d'entrée à l'aide de l'API Spark-Scala et créer un RDD.
La commande suivante est utilisée pour lire un fichier à partir d'un emplacement donné. Ici, un nouveau RDD est créé avec le nom de fichier d'entrée. La chaîne qui est donnée en argument dans la méthode textFile («») est le chemin absolu du nom du fichier d'entrée. Cependant, si seul le nom du fichier est donné, cela signifie que le fichier d'entrée se trouve à l'emplacement actuel.
scala> val inputfile = sc.textFile("input.txt")
Exécuter la transformation du nombre de mots
Notre objectif est de compter les mots dans un fichier. Créez une carte plate pour diviser chaque ligne en mots (flatMap(line ⇒ line.split(“ ”)).
Ensuite, lisez chaque mot comme une clé avec une valeur ‘1’ (<clé, valeur> = <mot, 1>) en utilisant la fonction de carte (map(word ⇒ (word, 1)).
Enfin, réduisez ces clés en ajoutant des valeurs de clés similaires (reduceByKey(_+_)).
La commande suivante est utilisée pour exécuter la logique de comptage de mots. Après avoir exécuté cela, vous ne trouverez aucune sortie car ce n'est pas une action, c'est une transformation; pointer un nouveau RDD ou dire à Spark quoi faire avec les données données)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
RDD actuel
Lorsque vous travaillez avec le RDD, si vous voulez en savoir plus sur le RDD actuel, utilisez la commande suivante. Il vous montrera la description du RDD actuel et de ses dépendances pour le débogage.
scala> counts.toDebugString
Mettre en cache les transformations
Vous pouvez marquer un RDD comme persistant en utilisant les méthodes persist () ou cache (). La première fois qu'il est calculé dans une action, il sera conservé en mémoire sur les nœuds. Utilisez la commande suivante pour stocker les transformations intermédiaires en mémoire.
scala> counts.cache()
Application de l'action
L'application d'une action, comme stocker toutes les transformations, aboutit à un fichier texte. L'argument String de la méthode saveAsTextFile («») est le chemin absolu du dossier de sortie. Essayez la commande suivante pour enregistrer la sortie dans un fichier texte. Dans l'exemple suivant, le dossier 'output' se trouve à l'emplacement actuel.
scala> counts.saveAsTextFile("output")
Vérification de la sortie
Ouvrez un autre terminal pour accéder au répertoire personnel (où spark est exécuté dans l'autre terminal). Utilisez les commandes suivantes pour vérifier le répertoire de sortie.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
La commande suivante est utilisée pour voir la sortie de Part-00000 des dossiers.
[hadoop@localhost output]$ cat part-00000
Production
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
La commande suivante est utilisée pour voir la sortie de Part-00001 des dossiers.
[hadoop@localhost output]$ cat part-00001
Production
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
L'ONU persiste dans le stockage
Avant de persister UN, si vous souhaitez voir l'espace de stockage utilisé pour cette application, utilisez l'URL suivante dans votre navigateur.
http://localhost:4040
Vous verrez l'écran suivant, qui montre l'espace de stockage utilisé pour l'application, qui s'exécute sur le shell Spark.
Si vous souhaitez annuler la persistance de l'espace de stockage d'un RDD particulier, utilisez la commande suivante.
Scala> counts.unpersist()
Vous verrez la sortie comme suit -
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Pour vérifier l'espace de stockage dans le navigateur, utilisez l'URL suivante.
http://localhost:4040/
Vous verrez l'écran suivant. Il montre l'espace de stockage utilisé pour l'application, qui s'exécute sur le shell Spark.
L'application Spark, à l'aide de spark-submit, est une commande shell utilisée pour déployer l'application Spark sur un cluster. Il utilise tous les gestionnaires de cluster respectifs via une interface uniforme. Par conséquent, vous n'avez pas à configurer votre application pour chacun d'eux.
Exemple
Prenons le même exemple de décompte de mots que nous avons utilisé auparavant, en utilisant des commandes shell. Ici, nous considérons le même exemple qu'une application Spark.
Exemple d'entrée
Le texte suivant est les données d'entrée et le fichier nommé est in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Regardez le programme suivant -
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Enregistrez le programme ci-dessus dans un fichier nommé SparkWordCount.scala et placez-le dans un répertoire défini par l'utilisateur nommé spark-application.
Note - Lors de la transformation de inputRDD en countRDD, nous utilisons flatMap () pour tokeniser les lignes (du fichier texte) en mots, la méthode map () pour compter la fréquence des mots et la méthode reductionByKey () pour compter chaque répétition de mot.
Suivez les étapes suivantes pour soumettre cette demande. Exécutez toutes les étapes duspark-application répertoire via le terminal.
Étape 1: Téléchargez Spark Ja
Spark core jar est requis pour la compilation, par conséquent, téléchargez spark-core_2.10-1.3.0.jar à partir du lien suivant Spark core jar et déplacez le fichier jar du répertoire de téléchargement versspark-application annuaire.
Étape 2: Compilez le programme
Compilez le programme ci-dessus en utilisant la commande ci-dessous. Cette commande doit être exécutée à partir du répertoire spark-application. Ici,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar est un fichier jar de support Hadoop extrait de la bibliothèque Spark.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Étape 3: créer un JAR
Créez un fichier jar de l'application Spark à l'aide de la commande suivante. Ici,wordcount est le nom de fichier du fichier jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Étape 4: Soumettez l'application Spark
Soumettez l'application Spark à l'aide de la commande suivante -
spark-submit --class SparkWordCount --master local wordcount.jar
S'il est exécuté avec succès, vous trouverez le résultat ci-dessous. leOKlaisser dans la sortie suivante est pour l'identification de l'utilisateur et c'est la dernière ligne du programme. Si vous lisez attentivement la sortie suivante, vous trouverez différentes choses, telles que -
- a démarré avec succès le service 'sparkDriver' sur le port 42954
- MemoryStore a démarré avec une capacité de 267,3 Mo
- Démarrage de SparkUI à l'adresse http://192.168.1.217:4040
- Fichier JAR ajouté: /home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile sur SparkPi.scala: 11) terminé en 0,566 s
- Interface utilisateur Web Spark arrêtée à l'adresse http://192.168.1.217:4040
- MemoryStore effacé
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Étape 5: Vérification de la sortie
Après une exécution réussie du programme, vous trouverez le répertoire nommé outfile dans le répertoire spark-application.
Les commandes suivantes sont utilisées pour ouvrir et vérifier la liste des fichiers dans le répertoire outfile.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
Les commandes de vérification de la sortie dans part-00000 fichier sont -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Les commandes pour vérifier la sortie dans le fichier part-00001 sont -
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Parcourez la section suivante pour en savoir plus sur la commande 'spark-submit'.
Syntaxe de Spark-submit
spark-submit [options] <app jar | python file> [app arguments]
Options
S.Non | Option | La description |
---|---|---|
1 | --Maître | spark: // hôte: port, mesos: // hôte: port, fil ou local. |
2 | --deploy-mode | S'il faut lancer le programme pilote localement ("client") ou sur l'une des machines de travail à l'intérieur du cluster ("cluster") (par défaut: client). |
3 | --classe | Classe principale de votre application (pour les applications Java / Scala). |
4 | --Nom | Un nom de votre application. |
5 | - bocaux | Liste des fichiers JAR locaux séparés par des virgules à inclure dans les chemins de classe du pilote et de l'exécuteur. |
6 | --paquets | Liste séparée par des virgules des coordonnées maven des fichiers JAR à inclure dans les chemins de classe du pilote et de l'exécuteur. |
sept | - référentiels | Liste séparée par des virgules de référentiels distants supplémentaires pour rechercher les coordonnées maven données avec --packages. |
8 | --py-fichiers | Liste de fichiers .zip, .egg ou .py séparés par des virgules à placer sur le PATH PYTHON pour les applications Python. |
9 | --des dossiers | Liste de fichiers séparés par des virgules à placer dans le répertoire de travail de chaque exécuteur. |
dix | --conf (prop = val) | Propriété de configuration Spark arbitraire. |
11 | --properties-fichier | Chemin vers un fichier à partir duquel charger des propriétés supplémentaires. S'il n'est pas spécifié, cela recherchera les valeurs par défaut de conf / spark. |
12 | --mémoire-pilote | Mémoire pour le pilote (par exemple 1000M, 2G) (par défaut: 512M). |
13 | --driver-java-options | Options Java supplémentaires à transmettre au pilote. |
14 | --driver-library-path | Entrées de chemin de bibliothèque supplémentaires à transmettre au pilote. |
15 | - chemin-classe-pilote | Entrées de chemin de classe supplémentaires à transmettre au pilote. Notez que les fichiers jars ajoutés avec --jars sont automatiquement inclus dans le chemin de classe. |
16 | - mémoire-exécuteur | Mémoire par exécuteur (par exemple 1000M, 2G) (par défaut: 1G). |
17 | --proxy-utilisateur | Utilisateur à emprunter l'identité lors de la soumission de la candidature. |
18 | --help, -h | Affichez ce message d'aide et quittez. |
19 | --verbose, -v | Imprimer une sortie de débogage supplémentaire. |
20 | --version | Imprimez la version actuelle de Spark. |
21 | --driver-cœurs NUM | Cœurs pour le pilote (par défaut: 1). |
22 | --superviser | Le cas échéant, redémarre le pilote en cas d'échec. |
23 | --tuer | S'il est donné, tue le pilote spécifié. |
24 | --statut | S'il est donné, demande le statut du pilote spécifié. |
25 | --total-executor-cores | Nombre total de cœurs pour tous les exécuteurs. |
26 | --executor-cores | Nombre de cœurs par exécuteur. (Par défaut: 1 en mode YARN, ou tous les cœurs disponibles sur le worker en mode autonome). |
Spark contient deux types différents de variables partagées - l'un est broadcast variables et le second est accumulators.
Broadcast variables - utilisé pour distribuer efficacement de grandes valeurs.
Accumulators - utilisé pour agréger les informations d'une collection particulière.
Variables de diffusion
Les variables de diffusion permettent au programmeur de conserver une variable en lecture seule en cache sur chaque machine plutôt que d'en envoyer une copie avec les tâches. Ils peuvent être utilisés, par exemple, pour donner à chaque nœud, une copie d'un grand ensemble de données d'entrée, de manière efficace. Spark tente également de distribuer des variables de diffusion à l'aide d'algorithmes de diffusion efficaces pour réduire les coûts de communication.
Les actions Spark sont exécutées à travers un ensemble d'étapes, séparées par des opérations de «shuffle» distribuées. Spark diffuse automatiquement les données communes nécessaires aux tâches à chaque étape.
Les données diffusées de cette manière sont mises en cache sous forme sérialisée et sont désérialisées avant d'exécuter chaque tâche. Cela signifie que la création explicite de variables de diffusion n'est utile que lorsque les tâches à travers plusieurs étapes nécessitent les mêmes données ou lorsque la mise en cache des données sous forme désérialisée est importante.
Les variables de diffusion sont créées à partir d'une variable v en appelant SparkContext.broadcast(v). La variable de diffusion est un wrapper autourv, et sa valeur est accessible en appelant le valueméthode. Le code donné ci-dessous montre ceci -
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
Output -
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
Une fois la variable de diffusion créée, elle doit être utilisée à la place de la valeur v dans toutes les fonctions exécutées sur le cluster, de sorte que vn'est pas expédié aux nœuds plus d'une fois. De plus, l'objetv ne doit pas être modifié après sa diffusion, afin de garantir que tous les nœuds obtiennent la même valeur de la variable de diffusion.
Accumulateurs
Les accumulateurs sont des variables qui ne sont «ajoutées» que par une opération associative et qui peuvent donc être efficacement prises en charge en parallèle. Ils peuvent être utilisés pour implémenter des compteurs (comme dans MapReduce) ou des sommes. Spark prend en charge nativement les accumulateurs de types numériques et les programmeurs peuvent ajouter la prise en charge de nouveaux types. Si des accumulateurs sont créés avec un nom, ils seront affichés dansSpark’s UI. Cela peut être utile pour comprendre la progression des étapes en cours d'exécution (REMARQUE - ce n'est pas encore pris en charge dans Python).
Un accumulateur est créé à partir d'une valeur initiale v en appelant SparkContext.accumulator(v). Les tâches en cours d'exécution sur le cluster peuvent ensuite y être ajoutées à l'aide duaddou l'opérateur + = (en Scala et Python). Cependant, ils ne peuvent pas lire sa valeur. Seul le programme pilote peut lire la valeur de l'accumulateur, en utilisant sonvalue méthode.
Le code donné ci-dessous montre un accumulateur utilisé pour additionner les éléments d'un tableau -
scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
Si vous voulez voir la sortie du code ci-dessus, utilisez la commande suivante -
scala> accum.value
Production
res2: Int = 10
Opérations RDD numériques
Spark vous permet d'effectuer différentes opérations sur des données numériques, à l'aide de l'une des méthodes API prédéfinies. Les opérations numériques de Spark sont implémentées avec un algorithme de streaming qui permet de créer le modèle, un élément à la fois.
Ces opérations sont calculées et renvoyées sous forme de StatusCounter objet en appelant status() méthode.
S.Non | Méthodes et signification |
---|---|
1 | count() Nombre d'éléments dans le RDD. |
2 | Mean() Moyenne des éléments du RDD. |
3 | Sum() Valeur totale des éléments dans le RDD. |
4 | Max() Valeur maximale parmi tous les éléments du RDD. |
5 | Min() Valeur minimale parmi tous les éléments du RDD. |
6 | Variance() Variance des éléments. |
sept | Stdev() Écart-type. |
Si vous souhaitez n'utiliser qu'une de ces méthodes, vous pouvez appeler la méthode correspondante directement sur RDD.