MapReduce - Partitioner

Partycjoner działa jak warunek podczas przetwarzania wejściowego zestawu danych. Faza podziału ma miejsce po fazie mapy i przed fazą redukcji.

Liczba partycjonerów jest równa liczbie reduktorów. Oznacza to, że partycjoner podzieli dane zgodnie z liczbą reduktorów. Dlatego dane przekazywane z jednego partycjonera są przetwarzane przez jeden reduktor.

Partitioner

Partycjoner dzieli pary klucz-wartość pośrednich wyjść mapy. Dzieli dane na partycje za pomocą warunku zdefiniowanego przez użytkownika, który działa jak funkcja skrótu. Całkowita liczba partycji jest taka sama, jak liczba zadań Reduktora dla zadania. Weźmy przykład, aby zrozumieć, jak działa partycjoner.

Implementacja MapReduce Partitioner

Dla wygody załóżmy, że mamy małą tabelkę o nazwie Pracownik z następującymi danymi. Użyjemy tych przykładowych danych jako naszego zestawu danych wejściowych, aby zademonstrować, jak działa partycjoner.

ID Nazwa Wiek Płeć Wynagrodzenie
1201 gopal 45 Męski 50 000
1202 manisha 40 Płeć żeńska 50 000
1203 khalil 34 Męski 30 000
1204 prasanth 30 Męski 30 000
1205 kiran 20 Męski 40 000
1206 laxmi 25 Płeć żeńska 35 000
1207 bhavya 20 Płeć żeńska 15 000
1208 reshma 19 Płeć żeńska 15 000
1209 kranthi 22 Męski 22 000
1210 Satish 24 Męski 25 000
1211 Kryszna 25 Męski 25 000
1212 Arshad 28 Męski 20000
1213 Lavanya 18 Płeć żeńska 8,000

Musimy napisać aplikację do przetwarzania zbioru danych wejściowych, aby znaleźć pracownika o najwyższym wynagrodzeniu według płci w różnych grupach wiekowych (na przykład poniżej 20 lat, od 21 do 30 lat, powyżej 30 lat).

Dane wejściowe

Powyższe dane są zapisywane jako input.txt w katalogu „/ home / hadoop / hadoopPartitioner” i podany jako dane wejściowe.

1201 gopal 45 Męski 50000
1202 manisha 40 Płeć żeńska 51000
1203 khaleel 34 Męski 30000
1204 prasanth 30 Męski 31000
1205 kiran 20 Męski 40000
1206 laxmi 25 Płeć żeńska 35000
1207 bhavya 20 Płeć żeńska 15000
1208 reshma 19 Płeć żeńska 14000
1209 kranthi 22 Męski 22000
1210 Satish 24 Męski 25000
1211 Kryszna 25 Męski 26000
1212 Arshad 28 Męski 20000
1213 Lavanya 18 Płeć żeńska 8000

Na podstawie podanych danych wejściowych następuje algorytmiczne wyjaśnienie programu.

Zadania mapy

Zadanie mapy akceptuje pary klucz-wartość jako dane wejściowe, podczas gdy dane tekstowe są w pliku tekstowym. Dane wejściowe dla tego zadania mapy są następujące -

Input - Kluczem byłby wzorzec, taki jak „dowolny klawisz specjalny + nazwa pliku + numer wiersza” (przykład: klucz = @ input1), a wartością byłyby dane w tym wierszu (przykład: wartość = 1201 \ t gopal \ t 45 \ t Mężczyzna \ t 50000).

Method - Działanie tego zadania mapy jest następujące -

  • Przeczytać value (dane rekordu), która jest wartością wejściową z listy argumentów w ciągu.

  • Używając funkcji podziału, oddziel płeć i zapisz w zmiennej ciągu.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Wyślij informacje o płci i dane rekordu value jako wyjściowa para klucz-wartość z zadania mapy do pliku partition task.

context.write(new Text(gender), new Text(value));
  • Powtórz wszystkie powyższe kroki dla wszystkich rekordów w pliku tekstowym.

Output - Otrzymasz dane dotyczące płci i wartości danych rekordu jako pary klucz-wartość.

Zadanie partycjonera

Zadanie partycjonowania akceptuje pary klucz-wartość z zadania mapy jako dane wejściowe. Partycja oznacza podział danych na segmenty. Zgodnie z podanymi warunkowymi kryteriami partycji, wejściowe sparowane dane klucz-wartość można podzielić na trzy części w oparciu o kryterium wieku.

Input - całe dane w zbiorze par klucz-wartość.

klucz = Wartość pola Płeć w rekordzie.

wartość = Wartość danych całego rekordu tej płci.

Method - Proces logiki partycji przebiega w następujący sposób.

  • Odczytaj wartość pola wieku z wejściowej pary klucz-wartość.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Sprawdź wartość wieku z następującymi warunkami.

    • Wiek poniżej 20 lat lub równy
    • Wiek: powyżej 20 lat i mniej niż lub równy 30.
    • Wiek powyżej 30 lat.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Całe dane par klucz-wartość są podzielone na trzy zbiory par klucz-wartość. Reducer działa indywidualnie na każdej kolekcji.

Zmniejsz liczbę zadań

Liczba zadań partycjonowania jest równa liczbie zadań reduktora. Tutaj mamy trzy zadania partycjonera, a zatem mamy do wykonania trzy zadania Reduktora.

Input - Reducer wykona trzykrotne wykonanie z różną kolekcją par klucz-wartość.

klucz = wartość pola płci w rekordzie.

wartość = wszystkie dane rekordu tej płci.

Method - Następująca logika zostanie zastosowana do każdej kolekcji.

  • Przeczytaj wartość pola Salary każdego rekordu.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Sprawdź wynagrodzenie ze zmienną max. Jeśli str [4] to maksymalna pensja, przypisz str [4] do max, w przeciwnym razie pomiń ten krok.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Powtórz kroki 1 i 2 dla każdego odbioru kluczy (kluczami są mężczyźni i kobiety). Po wykonaniu tych trzech kroków znajdziesz jedną maksymalną pensję z kolekcji kluczy dla mężczyzn i jedną maksymalną pensję z kolekcji kluczy dla kobiet.

context.write(new Text(key), new IntWritable(max));

Output- Na koniec otrzymasz zestaw danych par klucz-wartość w trzech kolekcjach z różnych grup wiekowych. Zawiera odpowiednio maksymalne wynagrodzenie z kolekcji Mężczyzna i maksymalne wynagrodzenie z kolekcji Kobieta w każdej grupie wiekowej.

Po wykonaniu zadań Map, Partitioner i Reduce, trzy kolekcje danych par klucz-wartość są przechowywane w trzech różnych plikach jako dane wyjściowe.

Wszystkie trzy zadania są traktowane jako zadania MapReduce. Poniższe wymagania i specyfikacje tych zadań należy określić w Konfiguracjach -

  • Nazwa pracy
  • Formaty wejściowe i wyjściowe kluczy i wartości
  • Indywidualne klasy dla zadań Map, Reduce i Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Przykładowy program

Poniższy program pokazuje, jak zaimplementować partycje dla podanych kryteriów w programie MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Zapisz powyższy kod jako PartitionerExample.javaw „/ home / hadoop / hadoopPartitioner”. Kompilację i wykonanie programu podano poniżej.

Kompilacja i wykonanie

Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (na przykład / home / hadoop).

Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.

Step 1- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Możesz pobrać jar ze strony mvnrepository.com .

Załóżmy, że pobrany folder to „/ home / hadoop / hadoopPartitioner”

Step 2 - Poniższe polecenia służą do kompilowania programu PartitionerExample.java i stworzenie słoika dla programu.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Użyj następującego polecenia, aby utworzyć katalog wejściowy w formacie HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Użyj następującego polecenia, aby skopiować plik wejściowy o nazwie input.txt w katalogu wejściowym HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Użyj następującego polecenia, aby zweryfikować pliki w katalogu wejściowym.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Użyj następującego polecenia, aby uruchomić aplikację z najwyższymi wynagrodzeniami, pobierając pliki wejściowe z katalogu wejściowego.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania i zadań reduktora.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Użyj następującego polecenia, aby zweryfikować pliki wynikowe w folderze wyjściowym.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Dane wyjściowe znajdziesz w trzech plikach, ponieważ używasz w swoim programie trzech partycjonerów i trzech reduktorów.

Step 8 - Użyj następującego polecenia, aby wyświetlić dane wyjściowe w formacie Part-00000plik. Ten plik jest generowany przez HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Użyj następującego polecenia, aby wyświetlić dane wyjściowe w Part-00001 plik.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Użyj następującego polecenia, aby wyświetlić dane wyjściowe w Part-00002 plik.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000