MapReduce - Partitioner

Pemartisi bekerja seperti kondisi dalam memproses kumpulan data masukan. Fase partisi berlangsung setelah fase Peta dan sebelum fase Mengurangi.

Jumlah pemartisi sama dengan jumlah reduksi. Itu artinya seorang pemartisi akan membagi data sesuai dengan jumlah reduksi. Oleh karena itu, data yang dikirimkan dari satu pemartisi diproses oleh satu Peredam.

Partisi

Pemartisi mempartisi pasangan nilai kunci dari keluaran Peta perantara. Ini mempartisi data menggunakan kondisi yang ditentukan pengguna, yang berfungsi seperti fungsi hash. Jumlah total partisi sama dengan jumlah tugas Peredam untuk pekerjaan itu. Mari kita ambil contoh untuk memahami cara kerja pemartisi.

Implementasi MapReduce Partitioner

Demi kenyamanan, mari kita asumsikan kita memiliki tabel kecil bernama Karyawan dengan data berikut. Kami akan menggunakan data sampel ini sebagai kumpulan data masukan kami untuk menunjukkan cara kerja pemartisi.

Indo Nama Usia Jenis kelamin Gaji
1201 gopal 45 Pria 50.000
1202 manisha 40 Perempuan 50.000
1203 khalil 34 Pria 30.000
1204 prasanth 30 Pria 30.000
1205 kiran 20 Pria 40.000
1206 laxmi 25 Perempuan 35.000
1207 bhavya 20 Perempuan 15.000
1208 reshma 19 Perempuan 15.000
1209 kranthi 22 Pria 22.000
1210 Satish 24 Pria 25.000
1211 Krishna 25 Pria 25.000
1212 Arshad 28 Pria 20.000
1213 lavanya 18 Perempuan 8.000

Kami harus membuat aplikasi untuk memproses kumpulan data masukan untuk menemukan karyawan dengan gaji tertinggi menurut jenis kelamin di berbagai kelompok usia (misalnya, di bawah 20, antara 21 hingga 30, di atas 30).

Memasukan data

Data di atas disimpan sebagai input.txt di direktori “/ home / hadoop / hadoopPartitioner” dan diberikan sebagai masukan.

1201 gopal 45 Pria 50000
1202 manisha 40 Perempuan 51000
1203 khaleel 34 Pria 30000
1204 prasanth 30 Pria 31000
1205 kiran 20 Pria 40000
1206 laxmi 25 Perempuan 35000
1207 bhavya 20 Perempuan 15000
1208 reshma 19 Perempuan 14000
1209 kranthi 22 Pria 22000
1210 Satish 24 Pria 25000
1211 Krishna 25 Pria 26000
1212 Arshad 28 Pria 20000
1213 lavanya 18 Perempuan 8000

Berdasarkan masukan yang diberikan, berikut penjelasan algoritmik program tersebut.

Tugas Peta

Tugas peta menerima pasangan nilai kunci sebagai input sementara kita memiliki data teks dalam file teks. Input untuk tugas peta ini adalah sebagai berikut -

Input - Kuncinya adalah pola seperti "kunci khusus + nama file + nomor baris" (contoh: key = @ input1) dan nilainya adalah data di baris itu (contoh: nilai = 1201 \ t gopal \ t 45 \ t Pria \ t 50000).

Method - Pengoperasian tugas peta ini adalah sebagai berikut -

  • Membaca value (record data), yang datang sebagai nilai input dari daftar argumen dalam sebuah string.

  • Menggunakan fungsi split, pisahkan jenis kelamin dan simpan dalam variabel string.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Kirim informasi gender dan data catatan value sebagai pasangan kunci-nilai keluaran dari tugas peta ke partition task.

context.write(new Text(gender), new Text(value));
  • Ulangi semua langkah di atas untuk semua catatan di file teks.

Output - Anda akan mendapatkan data jenis kelamin dan nilai data catatan sebagai pasangan nilai kunci.

Tugas Partisi

Tugas pemartisi menerima pasangan nilai kunci dari tugas peta sebagai inputnya. Partisi berarti membagi data menjadi beberapa segmen. Menurut kriteria partisi bersyarat yang diberikan, data berpasangan nilai kunci yang dimasukkan dapat dibagi menjadi tiga bagian berdasarkan kriteria usia.

Input - Seluruh data dalam kumpulan pasangan nilai-kunci.

key = Nilai bidang jenis kelamin dalam catatan.

nilai = Nilai data catatan utuh dari jenis kelamin itu.

Method - Proses logika partisi berjalan sebagai berikut.

  • Baca nilai bidang usia dari input key-value pair.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Cek nilai umur dengan ketentuan sebagai berikut.

    • Usia kurang dari atau sama dengan 20
    • Usia Lebih dari 20 dan Kurang dari atau sama dengan 30.
    • Usia Lebih dari 30.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Seluruh data pasangan nilai kunci tersegmentasi menjadi tiga kumpulan pasangan nilai kunci. Reducer bekerja secara individual pada setiap koleksi.

Kurangi Tugas

Jumlah tugas pemartisi sama dengan jumlah tugas peredam. Di sini kami memiliki tiga tugas pemartisi dan karenanya kami memiliki tiga tugas Reducer untuk dieksekusi.

Input - Reducer akan mengeksekusi tiga kali dengan koleksi key-value pair yang berbeda.

key = nilai bidang gender dalam catatan.

nilai = seluruh data catatan jenis kelamin itu.

Method - Logika berikut akan diterapkan pada setiap koleksi.

  • Baca nilai bidang Gaji dari setiap catatan.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Periksa gaji dengan variabel maks. Jika str [4] adalah gaji maksimum, maka tetapkan str [4] ke max, jika tidak lewati langkah tersebut.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Ulangi Langkah 1 dan 2 untuk setiap koleksi kunci (Pria & Wanita adalah koleksi kunci). Setelah melakukan tiga langkah ini, Anda akan menemukan satu gaji maksimal dari koleksi kunci Pria dan satu gaji maksimal dari koleksi kunci Wanita.

context.write(new Text(key), new IntWritable(max));

Output- Terakhir, Anda akan mendapatkan sekumpulan data pasangan nilai kunci dalam tiga kumpulan kelompok usia yang berbeda. Ini berisi gaji maksimal dari koleksi Pria dan gaji maksimal dari koleksi Wanita di masing-masing kelompok umur.

Setelah menjalankan tugas Map, Partitioner, dan Reduce, tiga kumpulan data key-value pair disimpan dalam tiga file berbeda sebagai output.

Ketiga tugas tersebut diperlakukan sebagai pekerjaan MapReduce. Persyaratan dan spesifikasi pekerjaan ini berikut harus ditentukan dalam Konfigurasi -

  • Nama Pekerjaan
  • Format Input dan Output dari kunci dan nilai
  • Kelas individual untuk tugas Map, Reduce, dan Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Contoh Program

Program berikut menunjukkan bagaimana mengimplementasikan pemartisi untuk kriteria yang diberikan dalam program MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Simpan kode di atas sebagai PartitionerExample.javadi "/ home / hadoop / hadoopPartitioner". Kompilasi dan eksekusi program diberikan di bawah ini.

Kompilasi dan Eksekusi

Mari kita asumsikan kita berada di direktori home dari pengguna Hadoop (misalnya, / home / hadoop).

Ikuti langkah-langkah yang diberikan di bawah ini untuk mengkompilasi dan menjalankan program di atas.

Step 1- Unduh Hadoop-core-1.2.1.jar, yang digunakan untuk mengkompilasi dan menjalankan program MapReduce. Anda dapat mengunduh jar dari mvnrepository.com .

Mari kita asumsikan folder yang diunduh adalah "/ home / hadoop / hadoopPartitioner"

Step 2 - Perintah berikut digunakan untuk menyusun program PartitionerExample.java dan membuat toples untuk program tersebut.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Gunakan perintah berikut untuk membuat direktori input di HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Gunakan perintah berikut untuk menyalin file input bernama input.txt di direktori input HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Gunakan perintah berikut untuk memverifikasi file di direktori input.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Gunakan perintah berikut untuk menjalankan aplikasi Gaji teratas dengan mengambil file input dari direktori input.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Tunggu beberapa saat hingga file dieksekusi. Setelah dieksekusi, output berisi sejumlah input split, tugas peta, dan tugas Reducer.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Gunakan perintah berikut untuk memverifikasi file yang dihasilkan di folder keluaran.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Anda akan menemukan output dalam tiga file karena Anda menggunakan tiga pemartisi dan tiga Reducer dalam program Anda.

Step 8 - Gunakan perintah berikut untuk melihat keluarannya Part-00000mengajukan. File ini dibuat oleh HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Gunakan perintah berikut untuk melihat keluarannya Part-00001 mengajukan.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Gunakan perintah berikut untuk melihat keluarannya Part-00002 mengajukan.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000