MapReduce - Partitionneur
Un partitionneur fonctionne comme une condition dans le traitement d'un ensemble de données d'entrée. La phase de partition a lieu après la phase de carte et avant la phase de réduction.
Le nombre de partitionneurs est égal au nombre de réducteurs. Cela signifie qu'un partitionneur divisera les données en fonction du nombre de réducteurs. Par conséquent, les données transmises à partir d'un seul partitionneur sont traitées par un seul réducteur.
Partitionneur
Un partitionneur partitionne les paires clé-valeur des sorties Map intermédiaires. Il partitionne les données en utilisant une condition définie par l'utilisateur, qui fonctionne comme une fonction de hachage. Le nombre total de partitions est identique au nombre de tâches du réducteur pour le travail. Prenons un exemple pour comprendre comment fonctionne le partitionneur.
Implémentation de MapReduce Partitioner
Pour des raisons de commodité, supposons que nous ayons une petite table appelée Employee avec les données suivantes. Nous utiliserons ces exemples de données comme ensemble de données d'entrée pour démontrer le fonctionnement du partitionneur.
Id | Nom | Âge | Le sexe | Un salaire |
---|---|---|---|---|
1201 | gopal | 45 | Masculin | 50 000 |
1202 | manisha | 40 | Femme | 50 000 |
1203 | Khalil | 34 | Masculin | 30 000 |
1204 | prasanthe | 30 | Masculin | 30 000 |
1205 | Kiran | 20 | Masculin | 40 000 |
1206 | laxmi | 25 | Femme | 35 000 |
1207 | bhavya | 20 | Femme | 15 000 |
1208 | reshma | 19 | Femme | 15 000 |
1209 | kranthi | 22 | Masculin | 22 000 |
1210 | Satish | 24 | Masculin | 25 000 |
1211 | Krishna | 25 | Masculin | 25 000 |
1212 | Arshad | 28 | Masculin | 20 000 |
1213 | Lavanya | 18 | Femme | 8 000 |
Nous devons rédiger une application pour traiter l'ensemble de données d'entrée afin de trouver le salarié le plus élevé par sexe dans différentes tranches d'âge (par exemple, moins de 20 ans, entre 21 et 30 ans, plus de 30 ans).
Des données d'entrée
Les données ci-dessus sont enregistrées sous input.txt dans le répertoire «/ home / hadoop / hadoopPartitioner» et donné en entrée.
1201 | gopal | 45 | Masculin | 50000 |
1202 | manisha | 40 | Femme | 51 000 |
1203 | khaleel | 34 | Masculin | 30000 |
1204 | prasanthe | 30 | Masculin | 31 000 |
1205 | Kiran | 20 | Masculin | 40000 |
1206 | laxmi | 25 | Femme | 35 000 |
1207 | bhavya | 20 | Femme | 15 000 |
1208 | reshma | 19 | Femme | 14 000 |
1209 | kranthi | 22 | Masculin | 22 000 |
1210 | Satish | 24 | Masculin | 25 000 |
1211 | Krishna | 25 | Masculin | 26 000 |
1212 | Arshad | 28 | Masculin | 20000 |
1213 | Lavanya | 18 | Femme | 8 000 |
Sur la base de l'entrée donnée, voici l'explication algorithmique du programme.
Mapper les tâches
La tâche de mappage accepte les paires clé-valeur comme entrée tandis que nous avons les données texte dans un fichier texte. L'entrée pour cette tâche cartographique est la suivante -
Input - La clé serait un modèle tel que «toute clé spéciale + nom de fichier + numéro de ligne» (exemple: clé = @ entrée1) et la valeur serait les données de cette ligne (exemple: valeur = 1201 \ t gopal \ t 45 \ t Homme \ t 50000).
Method - Le fonctionnement de cette tâche cartographique est le suivant -
Lis le value (données d'enregistrement), qui vient comme valeur d'entrée de la liste d'arguments dans une chaîne.
À l'aide de la fonction de fractionnement, séparez le sexe et stockez-le dans une variable chaîne.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Envoyez les informations de genre et les données d'enregistrement value comme paire clé-valeur de sortie de la tâche de mappage vers le partition task.
context.write(new Text(gender), new Text(value));
Répétez toutes les étapes ci-dessus pour tous les enregistrements du fichier texte.
Output - Vous obtiendrez les données de genre et la valeur des données d'enregistrement sous forme de paires clé-valeur.
Tâche de partitionnement
La tâche de partitionnement accepte les paires clé-valeur de la tâche de mappage comme entrée. La partition implique la division des données en segments. Selon les critères conditionnels donnés des partitions, les données appariées clé-valeur d'entrée peuvent être divisées en trois parties en fonction des critères d'âge.
Input - L'ensemble des données dans une collection de paires clé-valeur.
key = valeur du champ Sexe dans l'enregistrement.
value = Valeur entière des données d'enregistrement de ce sexe.
Method - Le processus de logique de partition se déroule comme suit.
- Lisez la valeur du champ d'âge à partir de la paire clé-valeur d'entrée.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Vérifiez la valeur de l'âge avec les conditions suivantes.
- Âge inférieur ou égal à 20 ans
- Âge supérieur à 20 ans et inférieur ou égal à 30 ans.
- Âge supérieur à 30 ans.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- L'ensemble des données des paires clé-valeur est segmenté en trois collections de paires clé-valeur. Le Réducteur travaille individuellement sur chaque collection.
Réduisez les tâches
Le nombre de tâches de partitionnement est égal au nombre de tâches de réduction. Ici, nous avons trois tâches de partitionnement et nous avons donc trois tâches de réduction à exécuter.
Input - Le réducteur s'exécutera trois fois avec une collection différente de paires clé-valeur.
clé = valeur du champ de sexe dans l'enregistrement.
valeur = l'ensemble des données d'enregistrement de ce sexe.
Method - La logique suivante sera appliquée à chaque collection.
- Lisez la valeur du champ Salaire de chaque enregistrement.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Vérifiez le salaire avec la variable max. Si str [4] est le salaire maximum, affectez str [4] à max, sinon sautez l'étape.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Répétez les étapes 1 et 2 pour chaque remise de clés (les hommes et les femmes sont les clés). Après avoir exécuté ces trois étapes, vous trouverez un salaire maximum de la remise des clés Homme et un salaire maximum de la remise des clés Femme.
context.write(new Text(key), new IntWritable(max));
Output- Enfin, vous obtiendrez un ensemble de données de paires clé-valeur dans trois collections de groupes d'âge différents. Il contient respectivement le salaire maximum de la collection Homme et le salaire maximum de la collection Femme dans chaque tranche d'âge.
Après avoir exécuté les tâches de mappage, de partitionnement et de réduction, les trois collections de données de paires clé-valeur sont stockées dans trois fichiers différents en tant que sortie.
Les trois tâches sont traitées comme des tâches MapReduce. Les exigences et spécifications suivantes de ces travaux doivent être spécifiées dans les configurations -
- Nom du travail
- Formats d'entrée et de sortie des clés et des valeurs
- Classes individuelles pour les tâches de mappage, de réduction et de partitionnement
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Exemple de programme
Le programme suivant montre comment implémenter les partitionneurs pour les critères donnés dans un programme MapReduce.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Enregistrez le code ci-dessus sous PartitionerExample.javadans «/ home / hadoop / hadoopPartitioner». La compilation et l'exécution du programme sont données ci-dessous.
Compilation et exécution
Supposons que nous soyons dans le répertoire personnel de l'utilisateur Hadoop (par exemple, / home / hadoop).
Suivez les étapes ci-dessous pour compiler et exécuter le programme ci-dessus.
Step 1- Téléchargez Hadoop-core-1.2.1.jar, qui est utilisé pour compiler et exécuter le programme MapReduce. Vous pouvez télécharger le fichier jar sur mvnrepository.com .
Supposons que le dossier téléchargé soit "/ home / hadoop / hadoopPartitioner"
Step 2 - Les commandes suivantes sont utilisées pour compiler le programme PartitionerExample.java et créer un bocal pour le programme.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - Utilisez la commande suivante pour créer un répertoire d'entrée dans HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Utilisez la commande suivante pour copier le fichier d'entrée nommé input.txt dans le répertoire d'entrée de HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Utilisez la commande suivante pour vérifier les fichiers dans le répertoire d'entrée.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Utilisez la commande suivante pour exécuter l'application Top Salaire en prenant les fichiers d'entrée dans le répertoire d'entrée.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Attendez un moment jusqu'à ce que le fichier soit exécuté. Après l'exécution, la sortie contient un certain nombre de fractionnements d'entrée, de tâches de mappage et de tâches de réduction.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Utilisez la commande suivante pour vérifier les fichiers résultants dans le dossier de sortie.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Vous trouverez la sortie dans trois fichiers car vous utilisez trois partitionneurs et trois réducteurs dans votre programme.
Step 8 - Utilisez la commande suivante pour voir la sortie dans Part-00000fichier. Ce fichier est généré par HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Utilisez la commande suivante pour voir la sortie dans Part-00001 fichier.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Utilisez la commande suivante pour voir la sortie dans Part-00002 fichier.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000