MapReduce - Guide rapide

MapReduce est un modèle de programmation pour l'écriture d'applications capables de traiter le Big Data en parallèle sur plusieurs nœuds. MapReduce fournit des capacités analytiques pour analyser d'énormes volumes de données complexes.

Qu'est-ce que le Big Data?

Le Big Data est un ensemble de grands ensembles de données qui ne peuvent pas être traités à l'aide des techniques informatiques traditionnelles. Par exemple, le volume de données dont Facebook ou Youtube ont besoin pour collecter et gérer au quotidien, peut entrer dans la catégorie du Big Data. Cependant, le Big Data n'est pas seulement une question d'échelle et de volume, il implique également un ou plusieurs des aspects suivants - Vitesse, Variété, Volume et Complexité.

Pourquoi MapReduce?

Les systèmes d'entreprise traditionnels ont normalement un serveur centralisé pour stocker et traiter les données. L'illustration suivante présente une vue schématique d'un système d'entreprise traditionnel. Le modèle traditionnel n'est certainement pas adapté pour traiter d'énormes volumes de données évolutives et ne peut pas être pris en charge par les serveurs de base de données standard. De plus, le système centralisé crée trop de goulots d'étranglement lors du traitement simultané de plusieurs fichiers.

Google a résolu ce problème de goulot d'étranglement à l'aide d'un algorithme appelé MapReduce. MapReduce divise une tâche en petites parties et les affecte à de nombreux ordinateurs. Plus tard, les résultats sont collectés en un seul endroit et intégrés pour former l'ensemble de données de résultat.

Comment fonctionne MapReduce?

L'algorithme MapReduce contient deux tâches importantes, à savoir Map et Reduce.

  • La tâche de mappage prend un ensemble de données et le convertit en un autre ensemble de données, où les éléments individuels sont décomposés en tuples (paires clé-valeur).

  • La tâche Réduire prend la sortie de la mappe comme entrée et combine ces tuples de données (paires clé-valeur) en un ensemble plus petit de tuples.

La tâche de réduction est toujours effectuée après la tâche de carte.

Examinons maintenant de près chacune des phases et essayons de comprendre leur signification.

  • Input Phase - Ici, nous avons un lecteur d'enregistrement qui traduit chaque enregistrement dans un fichier d'entrée et envoie les données analysées au mappeur sous la forme de paires clé-valeur.

  • Map - Map est une fonction définie par l'utilisateur, qui prend une série de paires clé-valeur et traite chacune d'elles pour générer zéro ou plusieurs paires clé-valeur.

  • Intermediate Keys - Les paires clé-valeur générées par le mappeur sont appelées clés intermédiaires.

  • Combiner- Un combineur est un type de réducteur local qui regroupe des données similaires de la phase cartographique dans des ensembles identifiables. Il prend les clés intermédiaires du mappeur comme entrée et applique un code défini par l'utilisateur pour agréger les valeurs dans une petite étendue d'un mappeur. Il ne fait pas partie de l'algorithme principal de MapReduce; c'est facultatif.

  • Shuffle and Sort- La tâche Réducteur commence par l'étape Aléatoire et tri. Il télécharge les paires clé-valeur groupées sur la machine locale, où le réducteur est exécuté. Les paires clé-valeur individuelles sont triées par clé dans une liste de données plus grande. La liste de données regroupe les clés équivalentes afin que leurs valeurs puissent être facilement itérées dans la tâche Reducer.

  • Reducer- Le réducteur prend les données groupées de paires clé-valeur en entrée et exécute une fonction de réduction sur chacune d'elles. Ici, les données peuvent être agrégées, filtrées et combinées de différentes manières, et cela nécessite un large éventail de traitements. Une fois l'exécution terminée, il donne zéro ou plusieurs paires clé-valeur à l'étape finale.

  • Output Phase - Dans la phase de sortie, nous avons un formateur de sortie qui traduit les paires clé-valeur finales de la fonction Réducteur et les écrit dans un fichier à l'aide d'un enregistreur d'enregistrement.

Essayons de comprendre les deux tâches Map & f Réduire à l'aide d'un petit diagramme -

MapReduce-Exemple

Prenons un exemple du monde réel pour comprendre la puissance de MapReduce. Twitter reçoit environ 500 millions de tweets par jour, soit près de 3000 tweets par seconde. L'illustration suivante montre comment Tweeter gère ses tweets à l'aide de MapReduce.

Comme le montre l'illustration, l'algorithme MapReduce effectue les actions suivantes -

  • Tokenize - Tokenise les tweets en cartes de jetons et les écrit sous forme de paires clé-valeur.

  • Filter - Filtre les mots indésirables des cartes de jetons et écrit les cartes filtrées sous forme de paires clé-valeur.

  • Count - Génère un compteur de jetons par mot.

  • Aggregate Counters - Prépare un agrégat de valeurs de compteur similaires en petites unités gérables.

L'algorithme MapReduce contient deux tâches importantes, à savoir Map et Reduce.

  • La tâche de carte est effectuée au moyen de la classe Mapper
  • La tâche de réduction est effectuée au moyen de la classe de réduction.

La classe Mapper prend l'entrée, la tokenise, la mappe et la trie. La sortie de la classe Mapper est utilisée comme entrée par la classe Reducer, qui à son tour recherche les paires correspondantes et les réduit.

MapReduce implémente divers algorithmes mathématiques pour diviser une tâche en petites parties et les affecter à plusieurs systèmes. En termes techniques, l'algorithme MapReduce aide à envoyer les tâches Map & Reduce aux serveurs appropriés dans un cluster.

Ces algorithmes mathématiques peuvent inclure les éléments suivants:

  • Sorting
  • Searching
  • Indexing
  • TF-IDF

Tri

Le tri est l'un des algorithmes de base de MapReduce pour traiter et analyser les données. MapReduce implémente un algorithme de tri pour trier automatiquement les paires clé-valeur en sortie du mappeur par leurs clés.

  • Les méthodes de tri sont implémentées dans la classe mapper elle-même.

  • Dans la phase Shuffle and Sort, après avoir tokenisé les valeurs dans la classe mapper, le Context class (classe définie par l'utilisateur) collecte les clés valuées correspondantes sous forme de collection.

  • Pour collecter des paires clé-valeur similaires (clés intermédiaires), la classe Mapper prend l'aide de RawComparator class pour trier les paires clé-valeur.

  • L'ensemble des paires clé-valeur intermédiaires pour un Reducer donné est automatiquement trié par Hadoop pour former des valeurs-clés (K2, {V2, V2,…}) avant d'être présentées au Reducer.

Recherche

La recherche joue un rôle important dans l'algorithme MapReduce. Il aide dans la phase de combineur (facultatif) et dans la phase de réduction. Essayons de comprendre comment fonctionne la recherche à l'aide d'un exemple.

Exemple

L'exemple suivant montre comment MapReduce utilise l'algorithme de recherche pour trouver les détails de l'employé qui tire le salaire le plus élevé dans un ensemble de données d'employé donné.

  • Supposons que nous ayons des données d'employés dans quatre fichiers différents - A, B, C et D. Supposons également qu'il y ait des enregistrements d'employés en double dans les quatre fichiers en raison de l'importation répétée des données d'employés de toutes les tables de la base de données. Reportez-vous à l'illustration suivante.

  • The Map phasetraite chaque fichier d'entrée et fournit les données sur les employés par paires clé-valeur (<k, v>: <nom de l'emp, salaire>). Reportez-vous à l'illustration suivante.

  • The combiner phase(technique de recherche) acceptera l'entrée de la phase de carte comme une paire clé-valeur avec le nom et le salaire de l'employé. En utilisant la technique de recherche, le combineur vérifiera tous les salaires des employés pour trouver l'employé le plus élevé dans chaque fichier. Voir l'extrait suivant.

<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary

if(v(second employee).salary > Max){
   Max = v(salary);
}

else{
   Continue checking;
}

Le résultat attendu est le suivant -

<satish, 26000>

<gopal, 50000>

<kiran, 45000>

<manisha, 45000>

  • Reducer phase- Formulaire de chaque dossier, vous trouverez le salarié le plus élevé. Pour éviter la redondance, vérifiez toutes les paires <k, v> et éliminez les entrées en double, le cas échéant. Le même algorithme est utilisé entre les quatre paires <k, v>, qui proviennent de quatre fichiers d'entrée. Le résultat final devrait être comme suit -

<gopal, 50000>

Indexage

Normalement, l'indexation est utilisée pour pointer vers une donnée particulière et son adresse. Il effectue une indexation par lots sur les fichiers d'entrée pour un mappeur particulier.

La technique d'indexation qui est normalement utilisée dans MapReduce est appelée inverted index.Les moteurs de recherche comme Google et Bing utilisent une technique d'indexation inversée. Essayons de comprendre le fonctionnement de l'indexation à l'aide d'un exemple simple.

Exemple

Le texte suivant est l'entrée pour l'indexation inversée. Ici T [0], T [1] et t [2] sont les noms de fichiers et leur contenu est entre guillemets.

T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"

Après avoir appliqué l'algorithme d'indexation, nous obtenons la sortie suivante -

"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}

Ici "a": {2} implique que le terme "a" apparaît dans le fichier T [2]. De même, "est": {0, 1, 2} implique que le terme "est" apparaît dans les fichiers T [0], T [1] et T [2].

TF-IDF

TF-IDF est un algorithme de traitement de texte qui est l'abréviation de Term Frequency - Inverse Document Frequency. C'est l'un des algorithmes d'analyse Web courants. Ici, le terme «fréquence» fait référence au nombre de fois qu'un terme apparaît dans un document.

Fréquence du terme (TF)

Il mesure la fréquence à laquelle un terme particulier apparaît dans un document. Il est calculé par le nombre de fois qu'un mot apparaît dans un document divisé par le nombre total de mots dans ce document.

TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)

Fréquence inverse des documents (IDF)

Il mesure l'importance d'un terme. Il est calculé par le nombre de documents dans la base de données texte divisé par le nombre de documents contenant un terme spécifique.

Lors du calcul de TF, tous les termes sont considérés comme également importants. Cela signifie que TF compte le terme fréquence pour les mots normaux tels que «est», «a», «quoi», etc. Ainsi, nous devons connaître les termes fréquents tout en augmentant les plus rares, en calculant ce qui suit -

IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).

L'algorithme est expliqué ci-dessous à l'aide d'un petit exemple.

Exemple

Prenons un document contenant 1000 mots, dans lequel le mot hiveapparaît 50 fois. Le TF pourhive est alors (50/1000) = 0,05.

Maintenant, supposons que nous ayons 10 millions de documents et le mot hiveapparaît dans 1000 d'entre eux. Ensuite, l'IDF est calculé comme suit: log (10 000 000/1 000) = 4.

Le poids TF-IDF est le produit de ces quantités - 0,05 × 4 = 0,20.

MapReduce ne fonctionne que sur les systèmes d'exploitation Linux et il est intégré à un framework Hadoop. Nous devons effectuer les étapes suivantes pour installer le framework Hadoop.

Vérification de l'installation JAVA

Java doit être installé sur votre système avant d'installer Hadoop. Utilisez la commande suivante pour vérifier si Java est installé sur votre système.

$ java –version

Si Java est déjà installé sur votre système, vous obtenez la réponse suivante -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Si Java n'est pas installé sur votre système, suivez les étapes ci-dessous.

Installer Java

Étape 1

Téléchargez la dernière version de Java à partir du lien suivant - ce lien .

Après le téléchargement, vous pouvez localiser le fichier jdk-7u71-linux-x64.tar.gz dans votre dossier Téléchargements.

Étape 2

Utilisez les commandes suivantes pour extraire le contenu de jdk-7u71-linux-x64.gz.

$ cd Downloads/
$ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

Étape 3

Pour rendre Java disponible à tous les utilisateurs, vous devez le déplacer vers l'emplacement «/ usr / local /». Allez à la racine et tapez les commandes suivantes -

$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit

Étape 4

Pour configurer les variables PATH et JAVA_HOME, ajoutez les commandes suivantes au fichier ~ / .bashrc.

export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin

Appliquez toutes les modifications au système en cours d'exécution.

$ source ~/.bashrc

Étape 5

Utilisez les commandes suivantes pour configurer les alternatives Java -

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2

# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2

# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java

# alternatives --set javac usr/local/java/bin/javac

# alternatives --set jar usr/local/java/bin/jar

Vérifiez maintenant l'installation à l'aide de la commande java -version depuis le terminal.

Vérification de l'installation de Hadoop

Hadoop doit être installé sur votre système avant d'installer MapReduce. Vérifions l'installation de Hadoop à l'aide de la commande suivante -

$ hadoop version

Si Hadoop est déjà installé sur votre système, vous obtiendrez la réponse suivante -

Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

Si Hadoop n'est pas installé sur votre système, procédez comme suit.

Téléchargement de Hadoop

Téléchargez Hadoop 2.4.1 depuis Apache Software Foundation et extrayez son contenu à l'aide des commandes suivantes.

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

Installer Hadoop en mode pseudo-distribué

Les étapes suivantes permettent d'installer Hadoop 2.4.1 en mode pseudo distribué.

Étape 1 - Configuration de Hadoop

Vous pouvez définir des variables d'environnement Hadoop en ajoutant les commandes suivantes au fichier ~ / .bashrc.

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

Appliquez toutes les modifications au système en cours d'exécution.

$ source ~/.bashrc

Étape 2 - Configuration Hadoop

Vous pouvez trouver tous les fichiers de configuration Hadoop à l'emplacement «$ HADOOP_HOME / etc / hadoop». Vous devez apporter les modifications appropriées à ces fichiers de configuration en fonction de votre infrastructure Hadoop.

$ cd $HADOOP_HOME/etc/hadoop

Pour développer des programmes Hadoop à l'aide de Java, vous devez réinitialiser les variables d'environnement Java dans hadoop-env.sh en remplaçant la valeur JAVA_HOME par l'emplacement de Java dans votre système.

export JAVA_HOME=/usr/local/java

Vous devez éditer les fichiers suivants pour configurer Hadoop -

  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml
  • mapred-site.xml

core-site.xml

core-site.xml contient les informations suivantes -

  • Numéro de port utilisé pour l'instance Hadoop
  • Mémoire allouée au système de fichiers
  • Limite de mémoire pour stocker les données
  • Taille des tampons de lecture / écriture

Ouvrez le fichier core-site.xml et ajoutez les propriétés suivantes entre les balises <configuration> et </configuration>.

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000 </value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xml contient les informations suivantes -

  • Valeur des données de réplication
  • Le chemin du namenode
  • Le chemin du datanode de vos systèmes de fichiers locaux (l'endroit où vous souhaitez stocker l'infra Hadoop)

Supposons les données suivantes.

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

Ouvrez ce fichier et ajoutez les propriétés suivantes entre les balises <configuration>, </configuration>.

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>
   
   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>
   
</configuration>

Note - Dans le fichier ci-dessus, toutes les valeurs de propriété sont définies par l'utilisateur et vous pouvez apporter des modifications en fonction de votre infrastructure Hadoop.

yarn-site.xml

Ce fichier est utilisé pour configurer le fil dans Hadoop. Ouvrez le fichier yarn-site.xml et ajoutez les propriétés suivantes entre les balises <configuration>, </configuration>.

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

Ce fichier est utilisé pour spécifier le framework MapReduce que nous utilisons. Par défaut, Hadoop contient un modèle de yarn-site.xml. Tout d'abord, vous devez copier le fichier de mapred-site.xml.template vers le fichier mapred-site.xml à l'aide de la commande suivante.

$ cp mapred-site.xml.template mapred-site.xml

Ouvrez le fichier mapred-site.xml et ajoutez les propriétés suivantes entre les balises <configuration>, </configuration>.

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Vérification de l'installation de Hadoop

Les étapes suivantes permettent de vérifier l'installation de Hadoop.

Étape 1 - Configuration du nœud de nom

Configurez le namenode en utilisant la commande "hdfs namenode -format" comme suit -

$ cd ~ $ hdfs namenode -format

Le résultat attendu est le suivant -

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

Étape 2 - Vérification des fichiers DFS Hadoop

Exécutez la commande suivante pour démarrer votre système de fichiers Hadoop.

$ start-dfs.sh

Le résultat attendu est le suivant -

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

Étape 3 - Vérification du script de fil

La commande suivante est utilisée pour démarrer le script de fil. L'exécution de cette commande démarrera vos démons yarn.

$ start-yarn.sh

Le résultat attendu est le suivant -

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

Étape 4 - Accéder à Hadoop sur le navigateur

Le numéro de port par défaut pour accéder à Hadoop est 50070. Utilisez l'URL suivante pour obtenir les services Hadoop sur votre navigateur.

http://localhost:50070/

La capture d'écran suivante montre le navigateur Hadoop.

Étape 5 - Vérifier toutes les applications d'un cluster

Le numéro de port par défaut pour accéder à toutes les applications d'un cluster est 8088. Utilisez l'URL suivante pour utiliser ce service.

http://localhost:8088/

La capture d'écran suivante montre un navigateur de cluster Hadoop.

Dans ce chapitre, nous examinerons de près les classes et leurs méthodes impliquées dans les opérations de programmation MapReduce. Nous allons principalement nous concentrer sur les éléments suivants -

  • Interface JobContext
  • Classe d'emploi
  • Classe Mapper
  • Classe de réducteur

Interface JobContext

L'interface JobContext est la super interface pour toutes les classes, qui définit différents travaux dans MapReduce. Il vous donne une vue en lecture seule du travail fourni aux tâches pendant leur exécution.

Voici les sous-interfaces de l'interface JobContext.

S.No. Description de la sous-interface
1. MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Définit le contexte donné au mappeur.

2. ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Définit le contexte transmis au réducteur.

La classe de travail est la classe principale qui implémente l'interface JobContext.

Classe d'emploi

La classe Job est la classe la plus importante de l'API MapReduce. Il permet à l'utilisateur de configurer le travail, de le soumettre, de contrôler son exécution et d'interroger l'état. Les méthodes définies ne fonctionnent que jusqu'à ce que le travail soit soumis, après quoi elles lèveront une IllegalStateException.

Normalement, l'utilisateur crée l'application, décrit les différentes facettes du travail, puis soumet le travail et surveille sa progression.

Voici un exemple de comment soumettre un travail -

// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

Constructeurs

Voici le résumé du constructeur de la classe Job.

S. Non Résumé du constructeur
1 Job()
2 Job(Configuration conf)
3 Job(Configuration conf, String jobName)

Méthodes

Certaines des méthodes importantes de la classe Job sont les suivantes:

S. Non Description de la méthode
1 getJobName()

Nom du travail spécifié par l'utilisateur.

2 getJobState()

Renvoie l'état actuel du Job.

3 isComplete()

Vérifie si le travail est terminé ou non.

4 setInputFormatClass()

Définit le InputFormat du travail.

5 setJobName(String name)

Définit le nom du travail spécifié par l'utilisateur.

6 setOutputFormatClass()

Définit le format de sortie du travail.

sept setMapperClass(Class)

Définit le mappeur pour le travail.

8 setReducerClass(Class)

Définit le réducteur pour le travail.

9 setPartitionerClass(Class)

Définit le partitionneur pour le travail.

dix setCombinerClass(Class)

Définit le combinateur pour le travail.

Classe Mapper

La classe Mapper définit le travail Map. Mappe les paires clé-valeur d'entrée à un ensemble de paires clé-valeur intermédiaires. Les cartes sont les tâches individuelles qui transforment les enregistrements d'entrée en enregistrements intermédiaires. Les enregistrements intermédiaires transformés n'ont pas besoin d'être du même type que les enregistrements d'entrée. Une paire d'entrée donnée peut correspondre à zéro ou à plusieurs paires de sortie.

Méthode

mapest la méthode la plus importante de la classe Mapper. La syntaxe est définie ci-dessous -

map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

Cette méthode est appelée une fois pour chaque paire clé-valeur dans le fractionnement d'entrée.

Classe de réducteur

La classe Reducer définit le travail Réduire dans MapReduce. Il réduit un ensemble de valeurs intermédiaires qui partagent une clé à un ensemble de valeurs plus petit. Les implémentations de réducteur peuvent accéder à la configuration d'un travail via la méthode JobContext.getConfiguration (). Un réducteur comporte trois phases principales: mélanger, trier et réduire.

  • Shuffle - Le réducteur copie la sortie triée de chaque mappeur en utilisant HTTP sur le réseau.

  • Sort- Le framework trie les entrées du Reducer par clés (puisque différents Mappers peuvent avoir sorti la même clé). Les phases de mélange et de tri se produisent simultanément, c'est-à-dire que pendant que les sorties sont extraites, elles sont fusionnées.

  • Reduce - Dans cette phase, la méthode de réduction (Object, Iterable, Context) est appelée pour chaque <clé, (collection de valeurs)> dans les entrées triées.

Méthode

reduceest la méthode la plus importante de la classe Reducer. La syntaxe est définie ci-dessous -

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

Cette méthode est appelée une fois pour chaque clé sur la collection de paires clé-valeur.

MapReduce est un framework utilisé pour écrire des applications pour traiter d'énormes volumes de données sur de grands clusters de matériel de base de manière fiable. Ce chapitre vous présente le fonctionnement de MapReduce dans le framework Hadoop à l'aide de Java.

Algorithme MapReduce

En général, le paradigme de MapReduce est basé sur l'envoi de programmes de réduction de carte aux ordinateurs où résident les données réelles.

  • Au cours d'une tâche MapReduce, Hadoop envoie les tâches de mappage et de réduction aux serveurs appropriés du cluster.

  • Le cadre gère tous les détails de la transmission de données, comme l'émission de tâches, la vérification de l'achèvement des tâches et la copie de données autour du cluster entre les nœuds.

  • La plupart des calculs ont lieu sur les nœuds avec des données sur des disques locaux qui réduisent le trafic réseau.

  • Après avoir terminé une tâche donnée, le cluster collecte et réduit les données pour former un résultat approprié, et les renvoie au serveur Hadoop.

Entrées et sorties (perspective Java)

Le framework MapReduce fonctionne sur des paires clé-valeur, c'est-à-dire que le framework considère l'entrée du travail comme un ensemble de paires clé-valeur et produit un ensemble de paires clé-valeur en tant que sortie du travail, éventuellement de types différents.

Les classes de clé et de valeur doivent être sérialisables par le framework et par conséquent, il est nécessaire d'implémenter l'interface Writable. En outre, les classes de clé doivent implémenter l'interface WritableComparable pour faciliter le tri par le framework.

Le format d'entrée et de sortie d'un travail MapReduce se présente sous la forme de paires clé-valeur -

(Entrée) <k1, v1> -> carte -> <k2, v2> -> réduire -> <k3, v3> (sortie).

Contribution Production
Carte <k1, v1> liste (<k2, v2>)
Réduire <k2, liste (v2)> liste (<k3, v3>)

Implémentation de MapReduce

Le tableau suivant présente les données relatives à la consommation électrique d'une organisation. Le tableau comprend la consommation électrique mensuelle et la moyenne annuelle pour cinq années consécutives.

Jan fév Mar avr Mai Juin juil Août SEP oct nov déc Moy
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Nous devons écrire des applications pour traiter les données d'entrée dans le tableau donné afin de trouver l'année d'utilisation maximale, l'année d'utilisation minimale, etc. Cette tâche est facile pour les programmeurs avec un nombre limité d'enregistrements, car ils écriront simplement la logique pour produire la sortie requise et passeront les données à l'application écrite.

Élevons maintenant l'échelle des données d'entrée. Supposons que nous devions analyser la consommation électrique de toutes les industries à grande échelle d'un État particulier. Lorsque nous écrivons des applications pour traiter de telles données en masse,

  • Leur exécution prendra beaucoup de temps.

  • Il y aura un trafic réseau important lorsque nous transférons les données de la source vers le serveur réseau.

Pour résoudre ces problèmes, nous avons le framework MapReduce.

Des données d'entrée

Les données ci-dessus sont enregistrées sous sample.txtet donné en entrée. Le fichier d'entrée ressemble à l'illustration ci-dessous.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Exemple de programme

Le programme suivant pour les exemples de données utilise le framework MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Enregistrez le programme ci-dessus dans ProcessUnits.java. La compilation et l'exécution du programme sont données ci-dessous.

Compilation et exécution du programme ProcessUnits

Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple / home / hadoop).

Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.

Step 1 - Utilisez la commande suivante pour créer un répertoire pour stocker les classes java compilées.

$ mkdir units

Step 2- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Téléchargez le fichier jar sur mvnrepository.com . Supposons que le dossier de téléchargement soit / home / hadoop /.

Step 3 - Les commandes suivantes sont utilisées pour compiler le ProcessUnits.java programme et pour créer un fichier jar pour le programme.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - La commande suivante est utilisée pour créer un répertoire d'entrée dans HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - La commande suivante permet de copier le fichier d'entrée nommé sample.txt dans le répertoire d'entrée de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - La commande suivante est utilisée pour vérifier les fichiers dans le répertoire d'entrée

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - La commande suivante est utilisée pour exécuter l'application Eleunit_max en prenant les fichiers d'entrée du répertoire d'entrée.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage, de tâches de réduction, etc.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - La commande suivante est utilisée pour vérifier les fichiers résultants dans le dossier de sortie.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - La commande suivante est utilisée pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Voici la sortie générée par le programme MapReduce -

1981 34
1984 40
1985 45

Step 10 - La commande suivante est utilisée pour copier le dossier de sortie de HDFS vers le système de fichiers local.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop

Un partitionneur fonctionne comme une condition dans le traitement d'un ensemble de données d'entrée. La phase de partition a lieu après la phase de carte et avant la phase de réduction.

Le nombre de partitionneurs est égal au nombre de réducteurs. Cela signifie qu'un partitionneur divisera les données en fonction du nombre de réducteurs. Par conséquent, les données transmises à partir d'un seul partitionneur sont traitées par un seul réducteur.

Partitionneur

Un partitionneur partitionne les paires clé-valeur des sorties Map intermédiaires. Il partitionne les données en utilisant une condition définie par l'utilisateur, qui fonctionne comme une fonction de hachage. Le nombre total de partitions est identique au nombre de tâches du réducteur pour le travail. Prenons un exemple pour comprendre comment fonctionne le partitionneur.

Implémentation de MapReduce Partitioner

Pour des raisons de commodité, supposons que nous ayons une petite table appelée Employee avec les données suivantes. Nous utiliserons ces exemples de données comme ensemble de données d'entrée pour démontrer le fonctionnement du partitionneur.

Id Nom Âge Le sexe Un salaire
1201 gopal 45 Masculin 50 000
1202 manisha 40 Femme 50 000
1203 Khalil 34 Masculin 30 000
1204 prasanthe 30 Masculin 30 000
1205 Kiran 20 Masculin 40 000
1206 laxmi 25 Femme 35 000
1207 bhavya 20 Femme 15 000
1208 reshma 19 Femme 15 000
1209 kranthi 22 Masculin 22 000
1210 Satish 24 Masculin 25 000
1211 Krishna 25 Masculin 25 000
1212 Arshad 28 Masculin 20 000
1213 Lavanya 18 Femme 8 000

Nous devons rédiger une application pour traiter l'ensemble de données d'entrée afin de trouver le salarié le plus élevé par sexe dans différentes tranches d'âge (par exemple, moins de 20 ans, entre 21 et 30 ans, plus de 30 ans).

Des données d'entrée

Les données ci-dessus sont enregistrées sous input.txt dans le répertoire «/ home / hadoop / hadoopPartitioner» et donné en entrée.

1201 gopal 45 Masculin 50000
1202 manisha 40 Femme 51 000
1203 khaleel 34 Masculin 30000
1204 prasanthe 30 Masculin 31 000
1205 Kiran 20 Masculin 40000
1206 laxmi 25 Femme 35 000
1207 bhavya 20 Femme 15 000
1208 reshma 19 Femme 14 000
1209 kranthi 22 Masculin 22 000
1210 Satish 24 Masculin 25 000
1211 Krishna 25 Masculin 26 000
1212 Arshad 28 Masculin 20000
1213 Lavanya 18 Femme 8 000

Sur la base de l'entrée donnée, voici l'explication algorithmique du programme.

Mapper les tâches

La tâche de mappage accepte les paires clé-valeur comme entrée tandis que nous avons les données texte dans un fichier texte. L'entrée pour cette tâche cartographique est la suivante -

Input - La clé serait un modèle tel que «toute clé spéciale + nom de fichier + numéro de ligne» (exemple: clé = @ entrée1) et la valeur serait les données de cette ligne (exemple: valeur = 1201 \ t gopal \ t 45 \ t Homme \ t 50000).

Method - Le fonctionnement de cette tâche cartographique est le suivant -

  • Lis le value (données d'enregistrement), qui vient comme valeur d'entrée de la liste d'arguments dans une chaîne.

  • À l'aide de la fonction de fractionnement, séparez le sexe et stockez-le dans une variable chaîne.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Envoyez les informations de genre et les données d'enregistrement value comme paire clé-valeur de sortie de la tâche de mappage vers le partition task.

context.write(new Text(gender), new Text(value));
  • Répétez toutes les étapes ci-dessus pour tous les enregistrements du fichier texte.

Output - Vous obtiendrez les données de genre et la valeur des données d'enregistrement sous forme de paires clé-valeur.

Tâche de partitionnement

La tâche de partitionnement accepte les paires clé-valeur de la tâche de mappage comme entrée. La partition implique la division des données en segments. Selon les critères conditionnels donnés des partitions, les données appariées clé-valeur d'entrée peuvent être divisées en trois parties en fonction des critères d'âge.

Input - L'ensemble des données dans une collection de paires clé-valeur.

key = valeur du champ Sexe dans l'enregistrement.

value = Valeur entière des données d'enregistrement de ce sexe.

Method - Le processus de logique de partition se déroule comme suit.

  • Lisez la valeur du champ d'âge à partir de la paire clé-valeur d'entrée.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Vérifiez la valeur de l'âge avec les conditions suivantes.

    • Âge inférieur ou égal à 20 ans
    • Âge supérieur à 20 ans et inférieur ou égal à 30 ans.
    • Âge supérieur à 30 ans.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- L'ensemble des données des paires clé-valeur est segmenté en trois collections de paires clé-valeur. Le Réducteur travaille individuellement sur chaque collection.

Réduisez les tâches

Le nombre de tâches de partitionnement est égal au nombre de tâches de réduction. Ici, nous avons trois tâches de partitionnement et nous avons donc trois tâches de réduction à exécuter.

Input - Le réducteur s'exécutera trois fois avec une collection différente de paires clé-valeur.

clé = valeur du champ de sexe dans l'enregistrement.

valeur = l'ensemble des données d'enregistrement de ce sexe.

Method - La logique suivante sera appliquée à chaque collection.

  • Lisez la valeur du champ Salaire de chaque enregistrement.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Vérifiez le salaire avec la variable max. Si str [4] est le salaire maximum, affectez str [4] à max, sinon sautez l'étape.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Répétez les étapes 1 et 2 pour chaque remise de clés (les hommes et les femmes sont les clés). Après avoir exécuté ces trois étapes, vous trouverez un salaire maximum de la remise des clés Homme et un salaire maximum de la remise des clés Femme.

context.write(new Text(key), new IntWritable(max));

Output- Enfin, vous obtiendrez un ensemble de données de paires clé-valeur dans trois collections de groupes d'âge différents. Il contient respectivement le salaire maximum de la collection Homme et le salaire maximum de la collection Femme dans chaque tranche d'âge.

Après avoir exécuté les tâches de mappage, de partitionnement et de réduction, les trois collections de données de paires clé-valeur sont stockées dans trois fichiers différents en tant que sortie.

Les trois tâches sont traitées comme des tâches MapReduce. Les exigences et spécifications suivantes de ces travaux doivent être spécifiées dans les configurations -

  • Nom du travail
  • Formats d'entrée et de sortie des clés et des valeurs
  • Classes individuelles pour les tâches de mappage, de réduction et de partitionnement
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Exemple de programme

Le programme suivant montre comment implémenter les partitionneurs pour les critères donnés dans un programme MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Enregistrez le code ci-dessus sous PartitionerExample.javadans «/ home / hadoop / hadoopPartitioner». La compilation et l'exécution du programme sont données ci-dessous.

Compilation et exécution

Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple, / home / hadoop).

Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.

Step 1- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Vous pouvez télécharger le fichier jar sur mvnrepository.com .

Supposons que le dossier téléchargé soit "/ home / hadoop / hadoopPartitioner"

Step 2 - Les commandes suivantes sont utilisées pour compiler le programme PartitionerExample.java et créer un bocal pour le programme.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .

Step 3 - Utilisez la commande suivante pour créer un répertoire d'entrée dans HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Utilisez la commande suivante pour copier le fichier d'entrée nommé input.txt dans le répertoire d'entrée de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Utilisez la commande suivante pour vérifier les fichiers dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Utilisez la commande suivante pour exécuter l'application Top Salaire en prenant les fichiers d'entrée dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage et de tâches de réduction.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Utilisez la commande suivante pour vérifier les fichiers résultants dans le dossier de sortie.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Vous trouverez la sortie dans trois fichiers car vous utilisez trois partitionneurs et trois réducteurs dans votre programme.

Step 8 - Utilisez la commande suivante pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Utilisez la commande suivante pour voir la sortie dans Part-00001 fichier.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Utilisez la commande suivante pour voir la sortie dans Part-00002 fichier.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000

Un combineur, également connu sous le nom de semi-reducer, est une classe facultative qui fonctionne en acceptant les entrées de la classe Map et en passant ensuite les paires clé-valeur de sortie à la classe Reducer.

La fonction principale d'un combinateur est de résumer les enregistrements de sortie de carte avec la même clé. La sortie (collection clé-valeur) du combineur sera envoyée sur le réseau à la tâche réelle du réducteur en tant qu'entrée.

Combiner

La classe Combiner est utilisée entre la classe Map et la classe Reduce pour réduire le volume de transfert de données entre Map et Reduce. En général, la sortie de la tâche cartographique est volumineuse et les données transférées vers la tâche de réduction sont élevées.

Le diagramme de tâches MapReduce suivant montre la PHASE DU COMBINER.

Comment fonctionne Combiner?

Voici un bref résumé du fonctionnement de MapReduce Combiner -

  • Un combineur n'a pas d'interface prédéfinie et il doit implémenter la méthode reduction () de l'interface Reducer.

  • Un combineur opère sur chaque clé de sortie de carte. Elle doit avoir les mêmes types de valeur-clé en sortie que la classe Reducer.

  • Un combineur peut produire des informations récapitulatives à partir d'un ensemble de données volumineux car il remplace la sortie Map d'origine.

Bien que Combiner soit facultatif, il permet de séparer les données en plusieurs groupes pour la phase de réduction, ce qui facilite le traitement.

Implémentation de MapReduce Combiner

L'exemple suivant fournit une idée théorique sur les combineurs. Supposons que nous ayons le fichier texte d'entrée suivant nomméinput.txt pour MapReduce.

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Les phases importantes du programme MapReduce avec Combiner sont décrites ci-dessous.

Lecteur d'enregistrement

Il s'agit de la première phase de MapReduce où le lecteur d'enregistrement lit chaque ligne du fichier texte d'entrée sous forme de texte et génère la sortie sous forme de paires clé-valeur.

Input - Texte ligne par ligne du fichier d'entrée.

Output- Forme les paires clé-valeur. Voici l'ensemble des paires clé-valeur attendues.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Phase de la carte

La phase de mappage prend l'entrée du lecteur d'enregistrement, la traite et produit la sortie sous la forme d'un autre ensemble de paires clé-valeur.

Input - La paire clé-valeur suivante est l'entrée provenant du lecteur d'enregistrement.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

La phase de mappage lit chaque paire clé-valeur, divise chaque mot de la valeur à l'aide de StringTokenizer, traite chaque mot comme clé et le nombre de ce mot comme valeur. L'extrait de code suivant montre la classe Mapper et la fonction de carte.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

Output - Le résultat attendu est le suivant -

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Phase de combinaison

La phase Combiner prend chaque paire clé-valeur de la phase Map, la traite et produit la sortie comme key-value collection paires.

Input - La paire clé-valeur suivante est l'entrée extraite de la phase Map.

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

La phase Combiner lit chaque paire clé-valeur, combine les mots communs en tant que clé et les valeurs en tant que collection. Habituellement, le code et le fonctionnement d'un combineur sont similaires à ceux d'un réducteur. Voici l'extrait de code pour la déclaration de classe Mapper, Combiner et Reducer.

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

Output - Le résultat attendu est le suivant -

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Phase de réduction

La phase de réduction prend chaque paire de collection clé-valeur de la phase de combinaison, la traite et transmet la sortie sous forme de paires clé-valeur. Notez que la fonctionnalité Combiner est la même que le Réducteur.

Input - La paire clé-valeur suivante est l'entrée extraite de la phase Combiner.

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

La phase de réduction lit chaque paire clé-valeur. Voici l'extrait de code du Combiner.

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

Output - La sortie attendue de la phase de réduction est la suivante -

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Record Writer

Il s'agit de la dernière phase de MapReduce où le Record Writer écrit chaque paire clé-valeur de la phase Reducer et envoie la sortie sous forme de texte.

Input - Chaque paire clé-valeur de la phase de réduction avec le format de sortie.

Output- Il vous donne les paires clé-valeur au format texte. Voici le résultat attendu.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

Exemple de programme

Le bloc de code suivant compte le nombre de mots dans un programme.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Enregistrez le programme ci-dessus sous WordCount.java. La compilation et l'exécution du programme sont données ci-dessous.

Compilation et exécution

Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple, / home / hadoop).

Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.

Step 1 - Utilisez la commande suivante pour créer un répertoire pour stocker les classes java compilées.

$ mkdir units

Step 2- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Vous pouvez télécharger le fichier jar sur mvnrepository.com .

Supposons que le dossier téléchargé est / home / hadoop /.

Step 3 - Utilisez les commandes suivantes pour compiler le WordCount.java programme et pour créer un fichier jar pour le programme.

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

Step 4 - Utilisez la commande suivante pour créer un répertoire d'entrée dans HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Utilisez la commande suivante pour copier le fichier d'entrée nommé input.txt dans le répertoire d'entrée de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

Step 6 - Utilisez la commande suivante pour vérifier les fichiers dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Utilisez la commande suivante pour exécuter l'application de comptage de mots en prenant les fichiers d'entrée dans le répertoire d'entrée.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage et de tâches de réduction.

Step 8 - Utilisez la commande suivante pour vérifier les fichiers résultants dans le dossier de sortie.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Utilisez la commande suivante pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Voici la sortie générée par le programme MapReduce.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

Ce chapitre explique l'administration Hadoop qui inclut à la fois l'administration HDFS et MapReduce.

  • L'administration HDFS comprend la surveillance de la structure des fichiers HDFS, des emplacements et des fichiers mis à jour.

  • L'administration de MapReduce comprend la surveillance de la liste des applications, la configuration des nœuds, l'état des applications, etc.

Surveillance HDFS

HDFS (Hadoop Distributed File System) contient les répertoires utilisateur, les fichiers d'entrée et les fichiers de sortie. Utilisez les commandes MapReduce,put et get, pour le stockage et la récupération.

Après avoir démarré le framework Hadoop (démons) en passant la commande «start-all.sh» sur «/ $ HADOOP_HOME / sbin», transmettez l'URL suivante au navigateur «http: // localhost: 50070». Vous devriez voir l'écran suivant sur votre navigateur.

La capture d'écran suivante montre comment parcourir le HDFS de navigation.

La capture d'écran suivante montre la structure des fichiers de HDFS. Il montre les fichiers dans le répertoire «/ user / hadoop».

La capture d'écran suivante montre les informations Datanode dans un cluster. Ici vous pouvez trouver un nœud avec ses configurations et capacités.

Surveillance des travaux MapReduce

Une application MapReduce est un ensemble de travaux (travail de mappage, combinateur, partitionneur et travail de réduction). Il est obligatoire de surveiller et de maintenir les éléments suivants -

  • Configuration du datanode là où l'application convient.
  • Le nombre de datanodes et de ressources utilisées par application.

Pour surveiller toutes ces choses, il est impératif que nous ayons une interface utilisateur. Après avoir démarré le framework Hadoop en passant la commande «start-all.sh» sur «/ $ HADOOP_HOME / sbin», transmettez l'URL suivante au navigateur «http: // localhost: 8080». Vous devriez voir l'écran suivant sur votre navigateur.

Dans la capture d'écran ci-dessus, le pointeur de la main se trouve sur l'ID de l'application. Cliquez simplement dessus pour trouver l'écran suivant sur votre navigateur. Il décrit ce qui suit -

  • Sur quel utilisateur l'application actuelle s'exécute

  • Le nom de l'application

  • Type de cette application

  • Statut actuel, statut final

  • Heure de démarrage de l'application, écoulée (temps de fin), si elle est terminée au moment de la surveillance

  • L'historique de cette application, c'est-à-dire les informations du journal

  • Et enfin, les informations sur les nœuds, c'est-à-dire les nœuds qui ont participé à l'exécution de l'application.

La capture d'écran suivante montre les détails d'une application particulière -

La capture d'écran suivante décrit les informations sur les nœuds en cours d'exécution. Ici, la capture d'écran ne contient qu'un seul nœud. Un pointeur de main affiche l'adresse de l'hôte local du nœud en cours d'exécution.