MapReduce - Bölümleyici

Bir bölümleyici, bir girdi veri kümesini işlemede bir koşul gibi çalışır. Bölme aşaması, Harita aşamasından sonra ve Azaltma aşamasından önce gerçekleşir.

Bölme sayısı, redüktör sayısına eşittir. Bu, bir bölümleyicinin verileri redüktör sayısına göre böleceği anlamına gelir. Bu nedenle, tek bir bölümleyiciden aktarılan veriler tek bir Redüktör tarafından işlenir.

Bölümleyici

Bir bölümleyici, ara Harita çıktılarının anahtar-değer çiftlerini bölümler. Karma işlevi gibi çalışan, kullanıcı tanımlı bir koşul kullanarak verileri bölümler. Toplam bölüm sayısı, iş için Reducer görevlerinin sayısıyla aynıdır. Bölümleyicinin nasıl çalıştığını anlamak için bir örnek alalım.

MapReduce Partitioner Uygulaması

Kolaylık olması açısından, aşağıdaki verileri içeren Çalışan adında küçük bir tablomuz olduğunu varsayalım. Bu örnek verileri, bölümleyicinin nasıl çalıştığını göstermek için girdi veri kümemiz olarak kullanacağız.

İD İsim Yaş Cinsiyet Maaş
1201 gopal 45 Erkek 50.000
1202 Manisha 40 Kadın 50.000
1203 Khalil 34 Erkek 30.000
1204 prasant 30 Erkek 30.000
1205 Kiran 20 Erkek 40.000
1206 Laxmi 25 Kadın 35.000
1207 Bhavya 20 Kadın 15.000
1208 reshma 19 Kadın 15.000
1209 Kranthi 22 Erkek 22.000
1210 Satish 24 Erkek 25.000
1211 Krishna 25 Erkek 25.000
1212 Arshad 28 Erkek 20.000
1213 Lavanya 18 Kadın 8.000

Farklı yaş gruplarında cinsiyete göre en yüksek maaşlı çalışanı bulmak için girdi veri setini işlemek için bir uygulama yazmalıyız (örneğin, 20'nin altı, 21-30 arası, 30'un üstü).

Giriş Verileri

Yukarıdaki veriler şu şekilde kaydedilir: input.txt "/ home / hadoop / hadoopPartitioner" dizininde ve girdi olarak verilir.

1201 gopal 45 Erkek 50000
1202 Manisha 40 Kadın 51000
1203 Khaleel 34 Erkek 30000
1204 prasant 30 Erkek 31000
1205 Kiran 20 Erkek 40000
1206 Laxmi 25 Kadın 35.000
1207 Bhavya 20 Kadın 15.000
1208 reshma 19 Kadın 14.000
1209 Kranthi 22 Erkek 22.000
1210 Satish 24 Erkek 25.000
1211 Krishna 25 Erkek 26.000
1212 Arshad 28 Erkek 20.000
1213 Lavanya 18 Kadın 8000

Verilen girdiye bağlı olarak, programın algoritmik açıklaması aşağıdadır.

Harita Görevleri

Harita görevi, metin verisini bir metin dosyasında tuttuğumuzda, anahtar-değer çiftlerini girdi olarak kabul eder. Bu harita görevinin girdisi aşağıdaki gibidir -

Input - Anahtar, "herhangi bir özel anahtar + dosya adı + satır numarası" (örnek: anahtar = @ girdi1) gibi bir kalıp olur ve değer, bu satırdaki veriler olur (örnek: değer = 1201 \ t gopal \ t 45 \ t Erkek \ t 50000).

Method - Bu harita görevinin çalışması aşağıdaki gibidir -

  • Okumak value (kayıt verileri), bir dizedeki bağımsız değişken listesinden giriş değeri olarak gelir.

  • Bölme işlevini kullanarak cinsiyeti ayırın ve bir dize değişkeninde saklayın.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Cinsiyet bilgilerini ve kayıt verilerini gönderin value eşleme görevinden anahtar / değer çifti çıktısı olarak partition task.

context.write(new Text(gender), new Text(value));
  • Metin dosyasındaki tüm kayıtlar için yukarıdaki tüm adımları tekrarlayın.

Output - Cinsiyet verilerini ve kayıt veri değerini anahtar / değer çiftleri olarak alacaksınız.

Partitioner Görevi

Bölümleyici görevi, eşleme görevindeki anahtar-değer çiftlerini girdi olarak kabul eder. Bölümleme, verilerin bölümlere bölünmesi anlamına gelir. Bölümlerin verilen koşullu kriterlerine göre, giriş anahtar-değer eşleştirilmiş verileri yaş kriterine göre üç bölüme ayrılabilir.

Input - Anahtar / değer çiftlerinden oluşan bir koleksiyondaki verilerin tamamı.

key = Kayıttaki cinsiyet alanı değeri.

değer = Bu cinsiyetin tüm kayıt veri değeri.

Method - Bölümleme mantığı süreci aşağıdaki gibi çalışır.

  • Giriş anahtar / değer çiftinden yaş alanı değerini okuyun.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Yaş değerini aşağıdaki koşullarla kontrol edin.

    • 20'den küçük veya eşit yaş
    • Yaş 20'den büyük ve 30'dan küçük veya 30'a eşit.
    • Yaş 30'dan büyük.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Anahtar / değer çiftlerinin tüm verileri, anahtar / değer çiftlerinin üç koleksiyonuna bölünür. Redüktör, her koleksiyonda ayrı ayrı çalışır.

Görevleri Azaltın

Bölme görevlerinin sayısı, düşürücü görevlerin sayısına eşittir. Burada üç bölümleyici görevimiz var ve dolayısıyla gerçekleştirilecek üç Redüktör görevimiz var.

Input - İndirgeyici, farklı anahtar / değer çiftleri koleksiyonuyla üç kez çalıştırılır.

kayıttaki anahtar = cinsiyet alanı değeri.

değer = o cinsiyetin tüm kayıt verileri.

Method - Her koleksiyona aşağıdaki mantık uygulanacaktır.

  • Her kaydın Maaş alanı değerini okuyun.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Maaşı maksimum değişkenle kontrol edin. Eğer str [4] maksimum maaş ise, str [4] 'ü max'a atayın, aksi takdirde adımı atlayın.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Her bir anahtar teslimi için Adım 1 ve 2'yi tekrarlayın (Erkek ve Kadın anahtar koleksiyonlardır). Bu üç adımı uyguladıktan sonra, Erkek anahtar koleksiyonundan bir maksimum maaş ve Kadın anahtar koleksiyonundan bir maksimum maaş bulacaksınız.

context.write(new Text(key), new IntWritable(max));

Output- Son olarak, farklı yaş gruplarından oluşan üç koleksiyonda bir dizi anahtar / değer çifti verisi alacaksınız. Sırasıyla her yaş grubundaki Erkek koleksiyonundan maksimum maaşı ve Kadın koleksiyonundan maksimum maaşı içerir.

Map, Partitioner ve Reduce görevlerini çalıştırdıktan sonra, anahtar-değer çifti verilerinin üç koleksiyonu çıktı olarak üç farklı dosyada depolanır.

Üç görevin tümü MapReduce işleri olarak değerlendirilir. Bu işlerin aşağıdaki gereksinimleri ve özellikleri Konfigürasyonlarda belirtilmelidir -

  • İş adı
  • Anahtarların ve değerlerin giriş ve çıkış formatları
  • Map, Reduce ve Partitioner görevleri için ayrı sınıflar
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Örnek Program

Aşağıdaki program, bir MapReduce programında verilen kriterler için bölümleyicilerin nasıl uygulanacağını gösterir.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Yukarıdaki kodu farklı kaydedin PartitionerExample.java"/ home / hadoop / hadoopPartitioner" içinde. Programın derlenmesi ve çalıştırılması aşağıda verilmiştir.

Derleme ve Yürütme

Hadoop kullanıcısının ana dizininde olduğumuzu varsayalım (örneğin, / home / hadoop).

Yukarıdaki programı derlemek ve yürütmek için aşağıda verilen adımları izleyin.

Step 1- MapReduce programını derlemek ve yürütmek için kullanılan Hadoop-core-1.2.1.jar dosyasını indirin. Kavanozu mvnrepository.com adresinden indirebilirsiniz .

İndirilen klasörün "/ home / hadoop / hadoopPartitioner" olduğunu varsayalım

Step 2 - Programı derlemek için aşağıdaki komutlar kullanılır PartitionerExample.java ve program için bir kavanoz oluşturmak.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - HDFS'de bir giriş dizini oluşturmak için aşağıdaki komutu kullanın.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - adlı giriş dosyasını kopyalamak için aşağıdaki komutu kullanın input.txt HDFS'nin giriş dizininde.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Giriş dizinindeki dosyaları doğrulamak için aşağıdaki komutu kullanın.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Giriş dosyalarını giriş dizininden alarak En yüksek maaş uygulamasını çalıştırmak için aşağıdaki komutu kullanın.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Dosya yürütülene kadar bir süre bekleyin. Yürütmeden sonra, çıktı bir dizi giriş bölmesi, harita görevleri ve Redüktör görevleri içerir.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Çıktı klasöründe ortaya çıkan dosyaları doğrulamak için aşağıdaki komutu kullanın.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Çıktıyı üç dosyada bulacaksınız çünkü programınızda üç bölümleyici ve üç Redüktör kullanıyorsunuz.

Step 8 - Çıktıyı görmek için aşağıdaki komutu kullanın Part-00000dosya. Bu dosya HDFS tarafından oluşturulmuştur.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Çıktıyı görmek için aşağıdaki komutu kullanın. Part-00001 dosya.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Çıktıyı görmek için aşağıdaki komutu kullanın. Part-00002 dosya.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000