MapReduce - Implémentation Hadoop
MapReduce est un framework utilisé pour écrire des applications afin de traiter d'énormes volumes de données sur de grands clusters de matériel de base de manière fiable. Ce chapitre vous présente le fonctionnement de MapReduce dans le framework Hadoop à l'aide de Java.
Algorithme MapReduce
Généralement, le paradigme MapReduce est basé sur l'envoi de programmes de réduction de carte aux ordinateurs où résident les données réelles.
Au cours d'une tâche MapReduce, Hadoop envoie les tâches de mappage et de réduction aux serveurs appropriés du cluster.
Le cadre gère tous les détails de la transmission de données, tels que l'émission de tâches, la vérification de l'achèvement des tâches et la copie de données autour du cluster entre les nœuds.
La plupart des calculs ont lieu sur les nœuds avec des données sur des disques locaux qui réduisent le trafic réseau.
Après avoir terminé une tâche donnée, le cluster collecte et réduit les données pour former un résultat approprié, et les renvoie au serveur Hadoop.
Entrées et sorties (perspective Java)
Le framework MapReduce fonctionne sur des paires clé-valeur, c'est-à-dire que le framework considère l'entrée du travail comme un ensemble de paires clé-valeur et produit un ensemble de paires clé-valeur en tant que sortie du travail, éventuellement de types différents.
Les classes de clé et de valeur doivent être sérialisables par le framework et par conséquent, il est nécessaire d'implémenter l'interface Writable. En outre, les classes de clés doivent implémenter l'interface WritableComparable pour faciliter le tri par le framework.
Le format d'entrée et de sortie d'un travail MapReduce se présente sous la forme de paires clé-valeur -
(Entrée) <k1, v1> -> carte -> <k2, v2> -> réduire -> <k3, v3> (sortie).
Contribution | Production | |
---|---|---|
Carte | <k1, v1> | liste (<k2, v2>) |
Réduire | <k2, liste (v2)> | liste (<k3, v3>) |
Implémentation de MapReduce
Le tableau suivant présente les données relatives à la consommation électrique d'une organisation. Le tableau comprend la consommation électrique mensuelle et la moyenne annuelle pour cinq années consécutives.
Jan | fév | Mar | avr | Mai | Juin | juil | Août | SEP | oct | nov | déc | Moy | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Nous devons écrire des applications pour traiter les données d'entrée dans le tableau donné afin de trouver l'année d'utilisation maximale, l'année d'utilisation minimale, etc. Cette tâche est facile pour les programmeurs avec un nombre limité d'enregistrements, car ils écriront simplement la logique pour produire la sortie requise et passeront les données à l'application écrite.
Élevons maintenant l'échelle des données d'entrée. Supposons que nous devions analyser la consommation électrique de toutes les industries à grande échelle d'un État particulier. Lorsque nous écrivons des applications pour traiter de telles données en masse,
Leur exécution prendra beaucoup de temps.
Il y aura un trafic réseau important lorsque nous transférons les données de la source vers le serveur réseau.
Pour résoudre ces problèmes, nous avons le framework MapReduce.
Des données d'entrée
Les données ci-dessus sont enregistrées sous sample.txtet donné en entrée. Le fichier d'entrée ressemble à celui ci-dessous.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Exemple de programme
Le programme suivant pour les exemples de données utilise le framework MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Enregistrez le programme ci-dessus dans ProcessUnits.java. La compilation et l'exécution du programme sont données ci-dessous.
Compilation et exécution du programme ProcessUnits
Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple / home / hadoop).
Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.
Step 1 - Utilisez la commande suivante pour créer un répertoire pour stocker les classes java compilées.
$ mkdir units
Step 2- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Téléchargez le fichier jar sur mvnrepository.com . Supposons que le dossier de téléchargement soit / home / hadoop /.
Step 3 - Les commandes suivantes sont utilisées pour compiler le ProcessUnits.java programme et pour créer un fichier jar pour le programme.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - La commande suivante est utilisée pour créer un répertoire d'entrée dans HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - La commande suivante permet de copier le fichier d'entrée nommé sample.txt dans le répertoire d'entrée de HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - La commande suivante est utilisée pour vérifier les fichiers dans le répertoire d'entrée
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - La commande suivante est utilisée pour exécuter l'application Eleunit_max en prenant les fichiers d'entrée du répertoire d'entrée.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage, de tâches de réduction, etc.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - La commande suivante est utilisée pour vérifier les fichiers résultants dans le dossier de sortie.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - La commande suivante est utilisée pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Voici la sortie générée par le programme MapReduce -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - La commande suivante est utilisée pour copier le dossier de sortie de HDFS vers le système de fichiers local.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop