MapReduce - Partitioner

Un partitioner funziona come una condizione nell'elaborazione di un set di dati di input. La fase di partizione avviene dopo la fase Mappa e prima della fase Riduci.

Il numero di partizionatori è uguale al numero di riduttori. Ciò significa che un partizionatore dividerà i dati in base al numero di riduttori. Pertanto, i dati passati da un singolo partitioner vengono elaborati da un singolo Reducer.

Partitioner

Un partizionatore partiziona le coppie chiave-valore degli output intermedi della mappa. Partiziona i dati utilizzando una condizione definita dall'utente, che funziona come una funzione hash. Il numero totale di partizioni è uguale al numero di attività di Reducer per il lavoro. Facciamo un esempio per capire come funziona il partitioner.

Implementazione del partizionatore MapReduce

Per comodità, supponiamo di avere una piccola tabella chiamata Employee con i seguenti dati. Useremo questi dati di esempio come set di dati di input per dimostrare come funziona il partitioner.

Id Nome Età Genere Stipendio
1201 gopal 45 Maschio 50.000
1202 manisha 40 Femmina 50.000
1203 khalil 34 Maschio 30.000
1204 prasanth 30 Maschio 30.000
1205 kiran 20 Maschio 40.000
1206 laxmi 25 Femmina 35.000
1207 bhavya 20 Femmina 15.000
1208 reshma 19 Femmina 15.000
1209 kranthi 22 Maschio 22.000
1210 Satish 24 Maschio 25.000
1211 Krishna 25 Maschio 25.000
1212 Arshad 28 Maschio 20.000
1213 lavanya 18 Femmina 8.000

Dobbiamo scrivere una domanda per elaborare il set di dati di input per trovare il dipendente più salariato in base al sesso in diverse fasce di età (ad esempio, sotto i 20 anni, tra i 21 ei 30 anni, sopra i 30).

Dati in ingresso

I dati di cui sopra vengono salvati come input.txt nella directory "/ home / hadoop / hadoopPartitioner" e fornito come input.

1201 gopal 45 Maschio 50000
1202 manisha 40 Femmina 51000
1203 khaleel 34 Maschio 30000
1204 prasanth 30 Maschio 31000
1205 kiran 20 Maschio 40000
1206 laxmi 25 Femmina 35000
1207 bhavya 20 Femmina 15000
1208 reshma 19 Femmina 14000
1209 kranthi 22 Maschio 22000
1210 Satish 24 Maschio 25000
1211 Krishna 25 Maschio 26000
1212 Arshad 28 Maschio 20000
1213 lavanya 18 Femmina 8000

Sulla base dell'input fornito, di seguito viene fornita la spiegazione algoritmica del programma.

Attività sulla mappa

L'attività di mappa accetta le coppie chiave-valore come input mentre abbiamo i dati di testo in un file di testo. L'input per questa attività di mappa è il seguente:

Input - La chiave sarebbe un modello come "qualsiasi chiave speciale + nome file + numero di riga" (esempio: chiave = @ input1) e il valore sarebbe i dati in quella riga (esempio: valore = 1201 \ t gopal \ t 45 \ t Maschio \ t 50000).

Method - Il funzionamento di questa attività di mappa è il seguente:

  • Leggi il value (record data), che viene fornito come valore di input dall'elenco di argomenti in una stringa.

  • Utilizzando la funzione split, separare il sesso e memorizzare in una variabile stringa.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Invia le informazioni sul sesso e i dati del record value come coppia chiave-valore di output dall'attività di mappa al file partition task.

context.write(new Text(gender), new Text(value));
  • Ripetere tutti i passaggi precedenti per tutti i record nel file di testo.

Output - Otterrai i dati sul sesso e il valore dei dati del record come coppie chiave-valore.

Attività partizionatore

L'attività di partizionamento accetta le coppie chiave-valore dall'attività di mappa come input. La partizione implica la divisione dei dati in segmenti. In base ai criteri condizionali delle partizioni forniti, i dati accoppiati valore-chiave di input possono essere suddivisi in tre parti in base ai criteri di età.

Input - Tutti i dati in una raccolta di coppie chiave-valore.

chiave = valore del campo Sesso nel record.

valore = valore dei dati dell'intero record di quel genere.

Method - Il processo della logica di partizione viene eseguito come segue.

  • Leggere il valore del campo età dalla coppia chiave-valore di input.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Verificare il valore dell'età con le seguenti condizioni.

    • Età inferiore o uguale a 20 anni
    • Età maggiore di 20 e minore o uguale a 30.
    • Età superiore a 30 anni.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Tutti i dati delle coppie chiave-valore sono segmentati in tre raccolte di coppie chiave-valore. Il Reducer funziona individualmente su ogni collezione.

Riduci le attività

Il numero di attività del partizionatore è uguale al numero di attività del riduttore. Qui abbiamo tre attività di partizionamento e quindi abbiamo tre attività di Reducer da eseguire.

Input - Il Reducer verrà eseguito tre volte con una raccolta diversa di coppie chiave-valore.

chiave = valore del campo sesso nel record.

valore = tutti i dati del record di quel genere.

Method - La seguente logica verrà applicata a ciascuna raccolta.

  • Leggi il valore del campo Stipendio di ogni record.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Controlla lo stipendio con la variabile max. Se str [4] è il salario massimo, assegna str [4] al massimo, altrimenti salta il passaggio.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Ripeti i passaggi 1 e 2 per ogni ritiro delle chiavi (maschio e femmina sono le collezioni delle chiavi). Dopo aver eseguito questi tre passaggi, troverai uno stipendio massimo dalla consegna delle chiavi maschile e uno stipendio massimo dalla consegna delle chiavi femminile.

context.write(new Text(key), new IntWritable(max));

Output- Infine, otterrai una serie di dati sulla coppia chiave-valore in tre raccolte di diversi gruppi di età. Contiene rispettivamente lo stipendio massimo della raccolta Maschile e lo stipendio massimo della raccolta Femminile in ciascuna fascia di età.

Dopo aver eseguito le attività Mappa, Partitioner e Riduci, le tre raccolte di dati di coppie chiave-valore vengono archiviate in tre file diversi come output.

Tutte e tre le attività vengono trattate come lavori MapReduce. I seguenti requisiti e specifiche di questi lavori devono essere specificati nelle Configurazioni:

  • Nome del lavoro
  • Formati di input e output di chiavi e valori
  • Classi individuali per attività di mappatura, riduzione e partizionamento
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Programma di esempio

Il seguente programma mostra come implementare i partizionatori per i criteri specificati in un programma MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Salva il codice sopra come PartitionerExample.javain "/ home / hadoop / hadoopPartitioner". Di seguito viene fornita la compilazione e l'esecuzione del programma.

Compilazione ed esecuzione

Supponiamo di essere nella directory home dell'utente Hadoop (ad esempio, / home / hadoop).

Seguire i passaggi indicati di seguito per compilare ed eseguire il programma sopra.

Step 1- Scarica Hadoop-core-1.2.1.jar, che viene utilizzato per compilare ed eseguire il programma MapReduce. Puoi scaricare il jar da mvnrepository.com .

Supponiamo che la cartella scaricata sia "/ home / hadoop / hadoopPartitioner"

Step 2 - I seguenti comandi vengono utilizzati per compilare il programma PartitionerExample.java e creare un vaso per il programma.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Utilizzare il comando seguente per creare una directory di input in HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Utilizzare il seguente comando per copiare il file di input denominato input.txt nella directory di input di HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Utilizzare il seguente comando per verificare i file nella directory di input.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Utilizzare il seguente comando per eseguire l'applicazione Top salary prendendo i file di input dalla directory di input.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Attendi qualche istante finché il file non viene eseguito. Dopo l'esecuzione, l'output contiene una serie di suddivisioni di input, attività di mappatura e attività di riduzione.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Utilizzare il seguente comando per verificare i file risultanti nella cartella di output.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Troverai l'output in tre file perché stai usando tre partitioner e tre Reducers nel tuo programma.

Step 8 - Usa il seguente comando per vedere l'output in Part-00000file. Questo file è generato da HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Usa il seguente comando per vedere l'output in Part-00001 file.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Usa il seguente comando per vedere l'output in Part-00002 file.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000