MapReduce - Combiners

Combiner, znany również jako semi-reducer, jest klasą opcjonalną, która działa na zasadzie akceptowania danych wejściowych z klasy Map, a następnie przekazywania wyjściowych par klucz-wartość do klasy Reducer.

Główną funkcją Combinera jest podsumowanie rekordów wyjściowych mapy za pomocą tego samego klucza. Dane wyjściowe (kolekcja klucz-wartość) sumatora zostaną przesłane przez sieć do rzeczywistego zadania Reduktora jako dane wejściowe.

Combiner

Klasa Combiner jest używana między klasą Map a klasą Reduce w celu zmniejszenia ilości przesyłanych danych między Mapami i Reduce. Zwykle wynik zadania mapy jest duży, a dane przesyłane do zadania redukcji są duże.

Poniższy diagram zadania MapReduce przedstawia fazę łączenia.

Jak działa Combiner?

Oto krótkie podsumowanie działania MapReduce Combiner -

  • Sumator nie ma wstępnie zdefiniowanego interfejsu i musi implementować metodę reduktora () interfejsu Reducer.

  • Na każdym kluczu wyjściowym mapy działa sumator. Musi mieć te same wyjściowe typy klucz-wartość co klasa Reducer.

  • Sumator może generować informacje podsumowujące z dużego zestawu danych, ponieważ zastępuje oryginalne dane wyjściowe mapy.

Chociaż Combiner jest opcjonalny, ale pomaga segregować dane na wiele grup w fazie Reduce, co ułatwia przetwarzanie.

Implementacja MapReduce Combiner

Poniższy przykład przedstawia teoretyczne pojęcie o łącznikach. Załóżmy, że mamy następujący plik wejściowy o nazwieinput.txt dla MapReduce.

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Poniżej omówiono ważne etapy działania programu MapReduce z programem Combiner.

Record Reader

Jest to pierwsza faza MapReduce, w której czytnik rekordów odczytuje każdy wiersz z wejściowego pliku tekstowego jako tekst i zwraca dane wyjściowe jako pary klucz-wartość.

Input - Tekst wiersz po wierszu z pliku wejściowego.

Output- tworzy pary klucz-wartość. Poniżej przedstawiono zestaw oczekiwanych par klucz-wartość.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Faza mapy

Faza mapy pobiera dane wejściowe z czytnika rekordów, przetwarza je i generuje dane wyjściowe jako kolejny zestaw par klucz-wartość.

Input - Następująca para klucz-wartość jest wejściem pobranym z czytnika rekordów.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Faza Map odczytuje każdą parę klucz-wartość, oddziela każde słowo od wartości za pomocą StringTokenizer, traktuje każde słowo jako klucz, a liczbę tego słowa jako wartość. Poniższy fragment kodu przedstawia klasę Mapper i funkcję map.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

Output - Oczekiwany wynik jest następujący -

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Faza łączenia

Faza Combiner pobiera każdą parę klucz-wartość z fazy mapy, przetwarza ją i generuje dane wyjściowe jako key-value collection pary.

Input - Następująca para klucz-wartość to dane wejściowe pobrane z fazy mapy.

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Faza Combiner odczytuje każdą parę klucz-wartość, łączy popularne słowa jako klucz i wartości jako kolekcję. Zwykle kod i działanie łącznika są podobne do kodu reduktora. Poniżej znajduje się fragment kodu dla deklaracji klas Mapper, Combiner i Reducer.

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

Output - Oczekiwany wynik jest następujący -

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Faza reduktora

Faza Reducer pobiera każdą parę kolekcji klucz-wartość z fazy Combiner, przetwarza ją i przekazuje dane wyjściowe jako pary klucz-wartość. Zwróć uwagę, że funkcja Combiner jest taka sama jak Reducer.

Input - Następująca para klucz-wartość jest wejściem pobranym z fazy łączenia.

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Faza reduktora odczytuje każdą parę klucz-wartość. Poniżej znajduje się fragment kodu Combiner.

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

Output - Oczekiwana moc wyjściowa z fazy reduktora jest następująca -

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Autor nagrań

Jest to ostatnia faza MapReduce, w której zapisujący rekord zapisuje każdą parę klucz-wartość z fazy Reducer i wysyła dane wyjściowe jako tekst.

Input - Każda para klucz-wartość z fazy reduktora wraz z formatem wyjściowym.

Output- Daje pary klucz-wartość w formacie tekstowym. Poniżej przedstawiono oczekiwany wynik.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

Przykładowy program

Poniższy blok kodu zlicza liczbę słów w programie.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Zapisz powyższy program jako WordCount.java. Kompilację i wykonanie programu podano poniżej.

Kompilacja i wykonanie

Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (na przykład / home / hadoop).

Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.

Step 1 - Użyj następującego polecenia, aby utworzyć katalog do przechowywania skompilowanych klas Java.

$ mkdir units

Step 2- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Możesz pobrać jar ze strony mvnrepository.com .

Załóżmy, że pobrany folder to / home / hadoop /.

Step 3 - Użyj następujących poleceń, aby skompilować plik WordCount.java program i stworzyć słoik dla programu.

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

Step 4 - Użyj następującego polecenia, aby utworzyć katalog wejściowy w formacie HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Użyj następującego polecenia, aby skopiować plik wejściowy o nazwie input.txt w katalogu wejściowym HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

Step 6 - Użyj następującego polecenia, aby zweryfikować pliki w katalogu wejściowym.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Użyj następującego polecenia, aby uruchomić aplikację licznika słów, pobierając pliki wejściowe z katalogu wejściowego.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania i zadań reduktora.

Step 8 - Użyj następującego polecenia, aby zweryfikować pliki wynikowe w folderze wyjściowym.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Użyj następującego polecenia, aby wyświetlić dane wyjściowe w formacie Part-00000plik. Ten plik jest generowany przez HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Poniżej przedstawiono dane wyjściowe wygenerowane przez program MapReduce.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1