MapReduce - Partitionierer

Ein Partitionierer funktioniert wie eine Bedingung bei der Verarbeitung eines Eingabedatensatzes. Die Partitionsphase findet nach der Map-Phase und vor der Reduce-Phase statt.

Die Anzahl der Partitionierer entspricht der Anzahl der Reduzierer. Das bedeutet, dass ein Partitionierer die Daten nach der Anzahl der Reduzierer aufteilt. Daher werden die von einem einzelnen Partitionierer übergebenen Daten von einem einzelnen Reduzierer verarbeitet.

Partitionierer

Ein Partitionierer partitioniert die Schlüssel-Wert-Paare von Map-Zwischenausgaben. Es partitioniert die Daten mithilfe einer benutzerdefinierten Bedingung, die wie eine Hash-Funktion funktioniert. Die Gesamtzahl der Partitionen entspricht der Anzahl der Reducer-Aufgaben für den Job. Nehmen wir ein Beispiel, um zu verstehen, wie der Partitionierer funktioniert.

MapReduce Partitioner-Implementierung

Nehmen wir der Einfachheit halber an, wir haben eine kleine Tabelle mit dem Namen Mitarbeiter mit den folgenden Daten. Wir werden diese Beispieldaten als Eingabedatensatz verwenden, um die Funktionsweise des Partitionierers zu demonstrieren.

Ich würde Name Alter Geschlecht Gehalt
1201 gopal 45 Männlich 50.000
1202 Manisha 40 Weiblich 50.000
1203 Khalil 34 Männlich 30.000
1204 Prasanth 30 Männlich 30.000
1205 kiran 20 Männlich 40.000
1206 laxmi 25 Weiblich 35.000
1207 Bhavya 20 Weiblich 15.000
1208 Reshma 19 Weiblich 15.000
1209 kranthi 22 Männlich 22.000
1210 Satish 24 Männlich 25.000
1211 Krishna 25 Männlich 25.000
1212 Arshad 28 Männlich 20.000
1213 Lavanya 18 Weiblich 8.000

Wir müssen einen Antrag schreiben, um den Eingabedatensatz zu verarbeiten und den Mitarbeiter mit dem höchsten Gehalt nach Geschlecht in verschiedenen Altersgruppen zu finden (z. B. unter 20, zwischen 21 und 30, über 30).

Eingabedaten

Die obigen Daten werden als gespeichert input.txt im Verzeichnis "/ home / hadoop / hadoopPartitioner" und als Eingabe angegeben.

1201 gopal 45 Männlich 50000
1202 Manisha 40 Weiblich 51000
1203 khaleel 34 Männlich 30000
1204 Prasanth 30 Männlich 31000
1205 kiran 20 Männlich 40000
1206 laxmi 25 Weiblich 35000
1207 Bhavya 20 Weiblich 15000
1208 Reshma 19 Weiblich 14000
1209 kranthi 22 Männlich 22000
1210 Satish 24 Männlich 25000
1211 Krishna 25 Männlich 26000
1212 Arshad 28 Männlich 20000
1213 Lavanya 18 Weiblich 8000

Basierend auf der gegebenen Eingabe folgt die algorithmische Erklärung des Programms.

Kartenaufgaben

Die Zuordnungsaufgabe akzeptiert die Schlüssel-Wert-Paare als Eingabe, während wir die Textdaten in einer Textdatei haben. Die Eingabe für diese Kartenaufgabe lautet wie folgt:

Input - Der Schlüssel wäre ein Muster wie "Beliebiger Sonderschlüssel + Dateiname + Zeilennummer" (Beispiel: Schlüssel = @ Eingabe1) und der Wert wären die Daten in dieser Zeile (Beispiel: Wert = 1201 \ t gopal \ t 45 \ t Männlich \ t 50000).

Method - Diese Kartenaufgabe funktioniert wie folgt:

  • Lies das value (Datensatzdaten), der als Eingabewert aus der Argumentliste in einer Zeichenfolge stammt.

  • Trennen Sie mit der Split-Funktion das Geschlecht und speichern Sie es in einer Zeichenfolgenvariablen.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Senden Sie die Geschlechtsinformationen und die Datensatzdaten value als Ausgabe-Schlüssel-Wert-Paar von der Zuordnungsaufgabe zum partition task.

context.write(new Text(gender), new Text(value));
  • Wiederholen Sie alle obigen Schritte für alle Datensätze in der Textdatei.

Output - Sie erhalten die Geschlechtsdaten und den Datensatzdatenwert als Schlüssel-Wert-Paare.

Partitioniereraufgabe

Die Partitionierungsaufgabe akzeptiert die Schlüssel-Wert-Paare aus der Zuordnungsaufgabe als Eingabe. Partitionierung bedeutet, die Daten in Segmente zu unterteilen. Entsprechend den gegebenen bedingten Kriterien für Partitionen können die gepaarten Eingabeschlüsselwertdaten basierend auf den Alterskriterien in drei Teile unterteilt werden.

Input - Die gesamten Daten in einer Sammlung von Schlüssel-Wert-Paaren.

key = Feldwert Geschlecht im Datensatz.

value = Gesamtwert der Datensatzdaten dieses Geschlechts.

Method - Der Prozess der Partitionslogik läuft wie folgt ab.

  • Lesen Sie den Altersfeldwert aus dem eingegebenen Schlüssel-Wert-Paar.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Überprüfen Sie den Alterswert unter den folgenden Bedingungen.

    • Alter kleiner oder gleich 20
    • Alter größer als 20 und kleiner als oder gleich 30.
    • Alter größer als 30.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Die gesamten Daten von Schlüssel-Wert-Paaren sind in drei Sammlungen von Schlüssel-Wert-Paaren unterteilt. Der Reducer arbeitet individuell an jeder Sammlung.

Aufgaben reduzieren

Die Anzahl der Partitioniereraufgaben entspricht der Anzahl der Reduzierungsaufgaben. Hier haben wir drei Partitionierer-Tasks und daher müssen drei Reducer-Tasks ausgeführt werden.

Input - Der Reduzierer wird dreimal mit einer unterschiedlichen Sammlung von Schlüssel-Wert-Paaren ausgeführt.

Schlüssel = Feldwert des Geschlechts im Datensatz.

value = die gesamten Datensatzdaten dieses Geschlechts.

Method - Die folgende Logik wird auf jede Sammlung angewendet.

  • Lesen Sie den Feldwert für das Gehalt jedes Datensatzes.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Überprüfen Sie das Gehalt mit der Maximalvariablen. Wenn str [4] das maximale Gehalt ist, weisen Sie str [4] max zu, andernfalls überspringen Sie den Schritt.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Wiederholen Sie die Schritte 1 und 2 für jede Schlüsselsammlung (männlich und weiblich sind die Schlüsselsammlungen). Nachdem Sie diese drei Schritte ausgeführt haben, finden Sie ein maximales Gehalt aus der Sammlung männlicher Schlüssel und ein maximales Gehalt aus der Sammlung weiblicher Schlüssel.

context.write(new Text(key), new IntWritable(max));

Output- Schließlich erhalten Sie eine Reihe von Schlüssel-Wert-Paar-Daten in drei Sammlungen verschiedener Altersgruppen. Es enthält das maximale Gehalt aus der männlichen Sammlung und das maximale Gehalt aus der weiblichen Sammlung in jeder Altersgruppe.

Nach dem Ausführen der Aufgaben Map, Partitioner und Reduce werden die drei Sammlungen von Schlüssel-Wert-Paar-Daten als Ausgabe in drei verschiedenen Dateien gespeichert.

Alle drei Aufgaben werden als MapReduce-Jobs behandelt. Die folgenden Anforderungen und Spezifikationen dieser Jobs sollten in den Konfigurationen angegeben werden -

  • Berufsbezeichnung
  • Eingabe- und Ausgabeformate von Schlüsseln und Werten
  • Einzelne Klassen für Map-, Reduce- und Partitioner-Aufgaben
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Beispielprogramm

Das folgende Programm zeigt, wie die Partitionierer für die angegebenen Kriterien in einem MapReduce-Programm implementiert werden.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Speichern Sie den obigen Code als PartitionerExample.javain "/ home / hadoop / hadoopPartitioner". Die Zusammenstellung und Ausführung des Programms ist unten angegeben.

Zusammenstellung und Ausführung

Nehmen wir an, wir befinden uns im Home-Verzeichnis des Hadoop-Benutzers (z. B. / home / hadoop).

Führen Sie die folgenden Schritte aus, um das obige Programm zu kompilieren und auszuführen.

Step 1- Laden Sie Hadoop-core-1.2.1.jar herunter, mit dem das MapReduce-Programm kompiliert und ausgeführt wird. Sie können das Glas von mvnrepository.com herunterladen .

Nehmen wir an, der heruntergeladene Ordner lautet "/ home / hadoop / hadoopPartitioner".

Step 2 - Die folgenden Befehle werden zum Kompilieren des Programms verwendet PartitionerExample.java und Erstellen eines Glases für das Programm.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Verwenden Sie den folgenden Befehl, um ein Eingabeverzeichnis in HDFS zu erstellen.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Verwenden Sie den folgenden Befehl, um die benannte Eingabedatei zu kopieren input.txt im Eingabeverzeichnis von HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Verwenden Sie den folgenden Befehl, um die Dateien im Eingabeverzeichnis zu überprüfen.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Verwenden Sie den folgenden Befehl, um die Anwendung "Top-Gehalt" auszuführen, indem Sie Eingabedateien aus dem Eingabeverzeichnis entnehmen.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Warten Sie eine Weile, bis die Datei ausgeführt wird. Nach der Ausführung enthält die Ausgabe eine Reihe von Eingabeaufteilungen, Zuordnungsaufgaben und Reduzierungsaufgaben.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Verwenden Sie den folgenden Befehl, um die resultierenden Dateien im Ausgabeordner zu überprüfen.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Sie finden die Ausgabe in drei Dateien, da Sie in Ihrem Programm drei Partitionierer und drei Reduzierer verwenden.

Step 8 - Verwenden Sie den folgenden Befehl, um die Ausgabe in anzuzeigen Part-00000Datei. Diese Datei wird von HDFS generiert.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Verwenden Sie den folgenden Befehl, um die Ausgabe in anzuzeigen Part-00001 Datei.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Verwenden Sie den folgenden Befehl, um die Ausgabe in anzuzeigen Part-00002 Datei.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000