MapReduce - Partitionierer
Ein Partitionierer funktioniert wie eine Bedingung bei der Verarbeitung eines Eingabedatensatzes. Die Partitionsphase findet nach der Map-Phase und vor der Reduce-Phase statt.
Die Anzahl der Partitionierer entspricht der Anzahl der Reduzierer. Das bedeutet, dass ein Partitionierer die Daten nach der Anzahl der Reduzierer aufteilt. Daher werden die von einem einzelnen Partitionierer übergebenen Daten von einem einzelnen Reduzierer verarbeitet.
Partitionierer
Ein Partitionierer partitioniert die Schlüssel-Wert-Paare von Map-Zwischenausgaben. Es partitioniert die Daten mithilfe einer benutzerdefinierten Bedingung, die wie eine Hash-Funktion funktioniert. Die Gesamtzahl der Partitionen entspricht der Anzahl der Reducer-Aufgaben für den Job. Nehmen wir ein Beispiel, um zu verstehen, wie der Partitionierer funktioniert.
MapReduce Partitioner-Implementierung
Nehmen wir der Einfachheit halber an, wir haben eine kleine Tabelle mit dem Namen Mitarbeiter mit den folgenden Daten. Wir werden diese Beispieldaten als Eingabedatensatz verwenden, um die Funktionsweise des Partitionierers zu demonstrieren.
Ich würde | Name | Alter | Geschlecht | Gehalt |
---|---|---|---|---|
1201 | gopal | 45 | Männlich | 50.000 |
1202 | Manisha | 40 | Weiblich | 50.000 |
1203 | Khalil | 34 | Männlich | 30.000 |
1204 | Prasanth | 30 | Männlich | 30.000 |
1205 | kiran | 20 | Männlich | 40.000 |
1206 | laxmi | 25 | Weiblich | 35.000 |
1207 | Bhavya | 20 | Weiblich | 15.000 |
1208 | Reshma | 19 | Weiblich | 15.000 |
1209 | kranthi | 22 | Männlich | 22.000 |
1210 | Satish | 24 | Männlich | 25.000 |
1211 | Krishna | 25 | Männlich | 25.000 |
1212 | Arshad | 28 | Männlich | 20.000 |
1213 | Lavanya | 18 | Weiblich | 8.000 |
Wir müssen einen Antrag schreiben, um den Eingabedatensatz zu verarbeiten und den Mitarbeiter mit dem höchsten Gehalt nach Geschlecht in verschiedenen Altersgruppen zu finden (z. B. unter 20, zwischen 21 und 30, über 30).
Eingabedaten
Die obigen Daten werden als gespeichert input.txt im Verzeichnis "/ home / hadoop / hadoopPartitioner" und als Eingabe angegeben.
1201 | gopal | 45 | Männlich | 50000 |
1202 | Manisha | 40 | Weiblich | 51000 |
1203 | khaleel | 34 | Männlich | 30000 |
1204 | Prasanth | 30 | Männlich | 31000 |
1205 | kiran | 20 | Männlich | 40000 |
1206 | laxmi | 25 | Weiblich | 35000 |
1207 | Bhavya | 20 | Weiblich | 15000 |
1208 | Reshma | 19 | Weiblich | 14000 |
1209 | kranthi | 22 | Männlich | 22000 |
1210 | Satish | 24 | Männlich | 25000 |
1211 | Krishna | 25 | Männlich | 26000 |
1212 | Arshad | 28 | Männlich | 20000 |
1213 | Lavanya | 18 | Weiblich | 8000 |
Basierend auf der gegebenen Eingabe folgt die algorithmische Erklärung des Programms.
Kartenaufgaben
Die Zuordnungsaufgabe akzeptiert die Schlüssel-Wert-Paare als Eingabe, während wir die Textdaten in einer Textdatei haben. Die Eingabe für diese Kartenaufgabe lautet wie folgt:
Input - Der Schlüssel wäre ein Muster wie "Beliebiger Sonderschlüssel + Dateiname + Zeilennummer" (Beispiel: Schlüssel = @ Eingabe1) und der Wert wären die Daten in dieser Zeile (Beispiel: Wert = 1201 \ t gopal \ t 45 \ t Männlich \ t 50000).
Method - Diese Kartenaufgabe funktioniert wie folgt:
Lies das value (Datensatzdaten), der als Eingabewert aus der Argumentliste in einer Zeichenfolge stammt.
Trennen Sie mit der Split-Funktion das Geschlecht und speichern Sie es in einer Zeichenfolgenvariablen.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Senden Sie die Geschlechtsinformationen und die Datensatzdaten value als Ausgabe-Schlüssel-Wert-Paar von der Zuordnungsaufgabe zum partition task.
context.write(new Text(gender), new Text(value));
Wiederholen Sie alle obigen Schritte für alle Datensätze in der Textdatei.
Output - Sie erhalten die Geschlechtsdaten und den Datensatzdatenwert als Schlüssel-Wert-Paare.
Partitioniereraufgabe
Die Partitionierungsaufgabe akzeptiert die Schlüssel-Wert-Paare aus der Zuordnungsaufgabe als Eingabe. Partitionierung bedeutet, die Daten in Segmente zu unterteilen. Entsprechend den gegebenen bedingten Kriterien für Partitionen können die gepaarten Eingabeschlüsselwertdaten basierend auf den Alterskriterien in drei Teile unterteilt werden.
Input - Die gesamten Daten in einer Sammlung von Schlüssel-Wert-Paaren.
key = Feldwert Geschlecht im Datensatz.
value = Gesamtwert der Datensatzdaten dieses Geschlechts.
Method - Der Prozess der Partitionslogik läuft wie folgt ab.
- Lesen Sie den Altersfeldwert aus dem eingegebenen Schlüssel-Wert-Paar.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Überprüfen Sie den Alterswert unter den folgenden Bedingungen.
- Alter kleiner oder gleich 20
- Alter größer als 20 und kleiner als oder gleich 30.
- Alter größer als 30.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Die gesamten Daten von Schlüssel-Wert-Paaren sind in drei Sammlungen von Schlüssel-Wert-Paaren unterteilt. Der Reducer arbeitet individuell an jeder Sammlung.
Aufgaben reduzieren
Die Anzahl der Partitioniereraufgaben entspricht der Anzahl der Reduzierungsaufgaben. Hier haben wir drei Partitionierer-Tasks und daher müssen drei Reducer-Tasks ausgeführt werden.
Input - Der Reduzierer wird dreimal mit einer unterschiedlichen Sammlung von Schlüssel-Wert-Paaren ausgeführt.
Schlüssel = Feldwert des Geschlechts im Datensatz.
value = die gesamten Datensatzdaten dieses Geschlechts.
Method - Die folgende Logik wird auf jede Sammlung angewendet.
- Lesen Sie den Feldwert für das Gehalt jedes Datensatzes.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Überprüfen Sie das Gehalt mit der Maximalvariablen. Wenn str [4] das maximale Gehalt ist, weisen Sie str [4] max zu, andernfalls überspringen Sie den Schritt.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Wiederholen Sie die Schritte 1 und 2 für jede Schlüsselsammlung (männlich und weiblich sind die Schlüsselsammlungen). Nachdem Sie diese drei Schritte ausgeführt haben, finden Sie ein maximales Gehalt aus der Sammlung männlicher Schlüssel und ein maximales Gehalt aus der Sammlung weiblicher Schlüssel.
context.write(new Text(key), new IntWritable(max));
Output- Schließlich erhalten Sie eine Reihe von Schlüssel-Wert-Paar-Daten in drei Sammlungen verschiedener Altersgruppen. Es enthält das maximale Gehalt aus der männlichen Sammlung und das maximale Gehalt aus der weiblichen Sammlung in jeder Altersgruppe.
Nach dem Ausführen der Aufgaben Map, Partitioner und Reduce werden die drei Sammlungen von Schlüssel-Wert-Paar-Daten als Ausgabe in drei verschiedenen Dateien gespeichert.
Alle drei Aufgaben werden als MapReduce-Jobs behandelt. Die folgenden Anforderungen und Spezifikationen dieser Jobs sollten in den Konfigurationen angegeben werden -
- Berufsbezeichnung
- Eingabe- und Ausgabeformate von Schlüsseln und Werten
- Einzelne Klassen für Map-, Reduce- und Partitioner-Aufgaben
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Beispielprogramm
Das folgende Programm zeigt, wie die Partitionierer für die angegebenen Kriterien in einem MapReduce-Programm implementiert werden.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Speichern Sie den obigen Code als PartitionerExample.javain "/ home / hadoop / hadoopPartitioner". Die Zusammenstellung und Ausführung des Programms ist unten angegeben.
Zusammenstellung und Ausführung
Nehmen wir an, wir befinden uns im Home-Verzeichnis des Hadoop-Benutzers (z. B. / home / hadoop).
Führen Sie die folgenden Schritte aus, um das obige Programm zu kompilieren und auszuführen.
Step 1- Laden Sie Hadoop-core-1.2.1.jar herunter, mit dem das MapReduce-Programm kompiliert und ausgeführt wird. Sie können das Glas von mvnrepository.com herunterladen .
Nehmen wir an, der heruntergeladene Ordner lautet "/ home / hadoop / hadoopPartitioner".
Step 2 - Die folgenden Befehle werden zum Kompilieren des Programms verwendet PartitionerExample.java und Erstellen eines Glases für das Programm.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - Verwenden Sie den folgenden Befehl, um ein Eingabeverzeichnis in HDFS zu erstellen.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Verwenden Sie den folgenden Befehl, um die benannte Eingabedatei zu kopieren input.txt im Eingabeverzeichnis von HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Verwenden Sie den folgenden Befehl, um die Dateien im Eingabeverzeichnis zu überprüfen.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Verwenden Sie den folgenden Befehl, um die Anwendung "Top-Gehalt" auszuführen, indem Sie Eingabedateien aus dem Eingabeverzeichnis entnehmen.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Warten Sie eine Weile, bis die Datei ausgeführt wird. Nach der Ausführung enthält die Ausgabe eine Reihe von Eingabeaufteilungen, Zuordnungsaufgaben und Reduzierungsaufgaben.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Verwenden Sie den folgenden Befehl, um die resultierenden Dateien im Ausgabeordner zu überprüfen.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Sie finden die Ausgabe in drei Dateien, da Sie in Ihrem Programm drei Partitionierer und drei Reduzierer verwenden.
Step 8 - Verwenden Sie den folgenden Befehl, um die Ausgabe in anzuzeigen Part-00000Datei. Diese Datei wird von HDFS generiert.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Verwenden Sie den folgenden Befehl, um die Ausgabe in anzuzeigen Part-00001 Datei.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Verwenden Sie den folgenden Befehl, um die Ausgabe in anzuzeigen Part-00002 Datei.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000