Apache Spark - Programmation principale

Spark Core est la base de l'ensemble du projet. Il fournit des fonctionnalités réparties de répartition des tâches, de planification et d'E / S de base. Spark utilise une structure de données fondamentale spécialisée appelée RDD (Resilient Distributed Datasets) qui est une collection logique de données partitionnées sur des machines. Les RDD peuvent être créés de deux manières; la première consiste à référencer des ensembles de données dans des systèmes de stockage externes et la deuxième consiste à appliquer des transformations (par exemple, mappage, filtre, réducteur, jointure) sur les RDD existants.

L'abstraction RDD est exposée via une API intégrée au langage. Cela simplifie la programmation car la façon dont les applications manipulent les RDD est similaire à la manipulation de collections locales de données.

Shell Spark

Spark fournit un shell interactif - un outil puissant pour analyser les données de manière interactive. Il est disponible en langage Scala ou Python. L'abstraction principale de Spark est une collection distribuée d'éléments appelée jeu de données distribué résilient (RDD). Les RDD peuvent être créés à partir de formats d'entrée Hadoop (tels que des fichiers HDFS) ou en transformant d'autres RDD.

Ouvrez Spark Shell

La commande suivante est utilisée pour ouvrir le shell Spark.

$ spark-shell

Créer un RDD simple

Créons un RDD simple à partir du fichier texte. Utilisez la commande suivante pour créer un RDD simple.

scala> val inputfile = sc.textFile(“input.txt”)

La sortie de la commande ci-dessus est

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

L'API Spark RDD en introduit quelques Transformations et peu Actions pour manipuler RDD.

Transformations RDD

Les transformations RDD renvoient un pointeur vers le nouveau RDD et vous permettent de créer des dépendances entre les RDD. Chaque RDD dans la chaîne de dépendances (String of Dependencies) a une fonction pour calculer ses données et a un pointeur (dépendance) vers son RDD parent.

Spark est paresseux, donc rien ne sera exécuté à moins que vous n'appeliez une transformation ou une action qui déclenchera la création et l'exécution de la tâche. Regardez l'extrait suivant de l'exemple de comptage de mots.

Par conséquent, la transformation RDD n'est pas un ensemble de données mais est une étape d'un programme (peut-être la seule étape) indiquant à Spark comment obtenir des données et quoi en faire.

Vous trouverez ci-dessous une liste des transformations RDD.

S. Non Transformations et signification
1

map(func)

Renvoie un nouvel ensemble de données distribué, formé en passant chaque élément de la source via une fonction func.

2

filter(func)

Renvoie un nouvel ensemble de données formé en sélectionnant les éléments de la source sur lesquels func renvoie vrai.

3

flatMap(func)

Similaire à map, mais chaque élément d'entrée peut être mappé à 0 ou plusieurs éléments de sortie (donc func doit renvoyer un Seq plutôt qu'un seul élément).

4

mapPartitions(func)

Similaire à map, mais s'exécute séparément sur chaque partition (bloc) du RDD, donc func doit être de type Iterator <T> ⇒ Iterator <U> lors de l'exécution sur un RDD de type T.

5

mapPartitionsWithIndex(func)

Similaire à la carte des partitions, mais fournit également func avec une valeur entière représentant l'index de la partition, donc func doit être de type (Int, Iterator <T>) ⇒ Iterator <U> lors de l'exécution sur un RDD de type T.

6

sample(withReplacement, fraction, seed)

Échantillon d'un fraction des données, avec ou sans remplacement, en utilisant une graine de générateur de nombres aléatoires donnée.

sept

union(otherDataset)

Renvoie un nouvel ensemble de données qui contient l'union des éléments de l'ensemble de données source et de l'argument.

8

intersection(otherDataset)

Renvoie un nouveau RDD contenant l'intersection des éléments du jeu de données source et de l'argument.

9

distinct([numTasks])

Renvoie un nouvel ensemble de données qui contient les éléments distincts de l'ensemble de données source.

dix

groupByKey([numTasks])

Lorsqu'il est appelé sur un ensemble de données de paires (K, V), renvoie un ensemble de données de paires (K, Iterable <V>).

Note - Si vous groupez afin d'effectuer une agrégation (telle qu'une somme ou une moyenne) sur chaque clé, l'utilisation de reductionByKey ou aggregateByKey donnera de bien meilleures performances.

11

reduceByKey(func, [numTasks])

Lorsqu'il est appelé sur un ensemble de données de paires (K, V), renvoie un ensemble de données de paires (K, V) où les valeurs de chaque clé sont agrégées en utilisant la fonction de réduction donnée func , qui doit être de type (V, V) ⇒ V Comme dans groupByKey, le nombre de tâches de réduction est configurable via un deuxième argument facultatif.

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Lorsqu'il est appelé sur un ensemble de données de paires (K, V), renvoie un ensemble de données de paires (K, U) où les valeurs de chaque clé sont agrégées à l'aide des fonctions de combinaison données et d'une valeur neutre "zéro". Permet un type de valeur agrégée différent du type de valeur d'entrée, tout en évitant les allocations inutiles. Comme dans groupByKey, le nombre de tâches de réduction est configurable via un deuxième argument facultatif.

13

sortByKey([ascending], [numTasks])

Lorsqu'il est appelé sur un ensemble de données de paires (K, V) où K implémente Ordered, renvoie un ensemble de données de paires (K, V) triées par clés dans l'ordre croissant ou décroissant, comme spécifié dans l'argument ascendant booléen.

14

join(otherDataset, [numTasks])

Lorsqu'il est appelé sur des ensembles de données de type (K, V) et (K, W), renvoie un ensemble de données de paires (K, (V, W)) avec toutes les paires d'éléments pour chaque clé. Les jointures externes sont prises en charge via leftOuterJoin, rightOuterJoin et fullOuterJoin.

15

cogroup(otherDataset, [numTasks])

Lorsqu'il est appelé sur des ensembles de données de type (K, V) et (K, W), renvoie un ensemble de données de tuples (K, (Iterable <V>, Iterable <W>)). Cette opération est également appelée groupe avec.

16

cartesian(otherDataset)

Lorsqu'il est appelé sur des ensembles de données de types T et U, renvoie un ensemble de données de paires (T, U) (toutes les paires d'éléments).

17

pipe(command, [envVars])

Dirigez chaque partition du RDD via une commande shell, par exemple un script Perl ou bash. Les éléments RDD sont écrits dans le stdin du processus et les lignes sorties vers son stdout sont renvoyées sous la forme d'un RDD de chaînes.

18

coalesce(numPartitions)

Diminuez le nombre de partitions dans le RDD à numPartitions. Utile pour exécuter des opérations plus efficacement après avoir filtré un grand ensemble de données.

19

repartition(numPartitions)

Remaniez les données du RDD de manière aléatoire pour créer plus ou moins de partitions et les équilibrer entre elles. Cela mélange toujours toutes les données sur le réseau.

20

repartitionAndSortWithinPartitions(partitioner)

Repartitionnez le RDD selon le partitionneur donné et, dans chaque partition résultante, triez les enregistrements par leurs clés. C'est plus efficace que d'appeler la répartition puis de trier dans chaque partition, car cela peut pousser le tri vers le bas dans la machine de mélange.

Actions

Le tableau suivant donne une liste d'actions, qui renvoient des valeurs.

S. Non Action et signification
1

reduce(func)

Agréger les éléments de l'ensemble de données à l'aide d'une fonction func(qui prend deux arguments et en renvoie un). La fonction doit être commutative et associative pour pouvoir être calculée correctement en parallèle.

2

collect()

Renvoie tous les éléments de l'ensemble de données sous forme de tableau dans le programme pilote. Cela est généralement utile après un filtre ou une autre opération qui renvoie un sous-ensemble suffisamment petit des données.

3

count()

Renvoie le nombre d'éléments dans l'ensemble de données.

4

first()

Renvoie le premier élément de l'ensemble de données (similaire à take (1)).

5

take(n)

Renvoie un tableau avec le premier n éléments de l'ensemble de données.

6

takeSample (withReplacement,num, [seed])

Renvoie un tableau avec un échantillon aléatoire de num éléments de l'ensemble de données, avec ou sans remplacement, spécifiant éventuellement une graine de générateur de nombres aléatoires.

sept

takeOrdered(n, [ordering])

Renvoie le premier n éléments du RDD en utilisant soit leur ordre naturel, soit un comparateur personnalisé.

8

saveAsTextFile(path)

Écrit les éléments de l'ensemble de données sous forme de fichier texte (ou ensemble de fichiers texte) dans un répertoire donné du système de fichiers local, HDFS ou tout autre système de fichiers pris en charge par Hadoop. Spark appelle toString sur chaque élément pour le convertir en une ligne de texte dans le fichier.

9

saveAsSequenceFile(path) (Java and Scala)

Écrit les éléments de l'ensemble de données en tant que fichier de séquence Hadoop dans un chemin donné dans le système de fichiers local, HDFS ou tout autre système de fichiers pris en charge par Hadoop. Ceci est disponible sur les RDD de paires clé-valeur qui implémentent l'interface Writable de Hadoop. Dans Scala, il est également disponible sur les types qui sont implicitement convertibles en Writable (Spark inclut des conversions pour les types de base comme Int, Double, String, etc.).

dix

saveAsObjectFile(path) (Java and Scala)

Écrit les éléments de l'ensemble de données dans un format simple à l'aide de la sérialisation Java, qui peut ensuite être chargée à l'aide de SparkContext.objectFile ().

11

countByKey()

Uniquement disponible sur les RDD de type (K, V). Renvoie une table de hachage de paires (K, Int) avec le nombre de chaque clé.

12

foreach(func)

Exécute une fonction funcsur chaque élément de l'ensemble de données. Ceci est généralement effectué pour des effets secondaires tels que la mise à jour d'un accumulateur ou l'interaction avec des systèmes de stockage externes.

Note- la modification de variables autres que les accumulateurs en dehors de foreach () peut entraîner un comportement indéfini. Voir Comprendre les fermetures pour plus de détails.

Programmation avec RDD

Voyons les implémentations de quelques transformations et actions RDD dans la programmation RDD à l'aide d'un exemple.

Exemple

Prenons un exemple de comptage de mots - Il compte chaque mot apparaissant dans un document. Considérez le texte suivant comme une entrée et est enregistré en tant queinput.txt fichier dans un répertoire personnel.

input.txt - fichier d'entrée.

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

Suivez la procédure ci-dessous pour exécuter l'exemple donné.

Ouvrez Spark-Shell

La commande suivante est utilisée pour ouvrir Spark Shell. Généralement, spark est construit à l'aide de Scala. Par conséquent, un programme Spark s'exécute sur l'environnement Scala.

$ spark-shell

Si le shell Spark s'ouvre avec succès, vous trouverez la sortie suivante. Regardez la dernière ligne de la sortie «Spark context available as sc» signifie que le conteneur Spark est automatiquement créé objet de contexte Spark avec le nomsc. Avant de démarrer la première étape d'un programme, l'objet SparkContext doit être créé.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

Créer un RDD

Tout d'abord, nous devons lire le fichier d'entrée à l'aide de l'API Spark-Scala et créer un RDD.

La commande suivante est utilisée pour lire un fichier à partir d'un emplacement donné. Ici, un nouveau RDD est créé avec le nom de fichier d'entrée. La chaîne qui est donnée en argument dans la méthode textFile («») est le chemin absolu du nom du fichier d'entrée. Cependant, si seul le nom du fichier est donné, cela signifie que le fichier d'entrée se trouve à l'emplacement actuel.

scala> val inputfile = sc.textFile("input.txt")

Exécuter la transformation du nombre de mots

Notre objectif est de compter les mots dans un fichier. Créez une carte plate pour diviser chaque ligne en mots (flatMap(line ⇒ line.split(“ ”)).

Ensuite, lisez chaque mot comme une clé avec une valeur ‘1’ (<clé, valeur> = <mot, 1>) en utilisant la fonction de carte (map(word ⇒ (word, 1)).

Enfin, réduisez ces clés en ajoutant des valeurs de clés similaires (reduceByKey(_+_)).

La commande suivante est utilisée pour exécuter la logique de comptage de mots. Après avoir exécuté cela, vous ne trouverez aucune sortie car ce n'est pas une action, c'est une transformation; pointer un nouveau RDD ou dire à Spark quoi faire avec les données données)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

RDD actuel

Lorsque vous travaillez avec le RDD, si vous voulez en savoir plus sur le RDD actuel, utilisez la commande suivante. Il vous montrera la description du RDD actuel et de ses dépendances pour le débogage.

scala> counts.toDebugString

Mettre en cache les transformations

Vous pouvez marquer un RDD comme persistant en utilisant les méthodes persist () ou cache (). La première fois qu'il est calculé dans une action, il sera conservé en mémoire sur les nœuds. Utilisez la commande suivante pour stocker les transformations intermédiaires en mémoire.

scala> counts.cache()

Application de l'action

L'application d'une action, comme stocker toutes les transformations, aboutit à un fichier texte. L'argument String de la méthode saveAsTextFile («») est le chemin absolu du dossier de sortie. Essayez la commande suivante pour enregistrer la sortie dans un fichier texte. Dans l'exemple suivant, le dossier 'output' se trouve à l'emplacement actuel.

scala> counts.saveAsTextFile("output")

Vérification de la sortie

Ouvrez un autre terminal pour accéder au répertoire personnel (où spark est exécuté dans l'autre terminal). Utilisez les commandes suivantes pour vérifier le répertoire de sortie.

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

La commande suivante est utilisée pour voir la sortie de Part-00000 des dossiers.

[hadoop@localhost output]$ cat part-00000

Production

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

La commande suivante est utilisée pour voir la sortie de Part-00001 des dossiers.

[hadoop@localhost output]$ cat part-00001

Production

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

L'ONU persiste dans le stockage

Avant de persister UN, si vous souhaitez voir l'espace de stockage utilisé pour cette application, utilisez l'URL suivante dans votre navigateur.

http://localhost:4040

Vous verrez l'écran suivant, qui montre l'espace de stockage utilisé pour l'application, qui s'exécute sur le shell Spark.

Si vous souhaitez annuler la persistance de l'espace de stockage d'un RDD particulier, utilisez la commande suivante.

Scala> counts.unpersist()

Vous verrez la sortie comme suit -

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

Pour vérifier l'espace de stockage dans le navigateur, utilisez l'URL suivante.

http://localhost:4040/

Vous verrez l'écran suivant. Il montre l'espace de stockage utilisé pour l'application, qui s'exécute sur le shell Spark.