MapReduce - Trình phân vùng

Một trình phân vùng hoạt động giống như một điều kiện để xử lý một tập dữ liệu đầu vào. Giai đoạn phân vùng diễn ra sau giai đoạn Bản đồ và trước giai đoạn Rút gọn.

Số bộ phân vùng bằng số bộ giảm bớt. Điều đó có nghĩa là một trình phân vùng sẽ chia dữ liệu theo số lượng bộ giảm. Do đó, dữ liệu được truyền từ một bộ phân vùng duy nhất được xử lý bởi một Bộ giảm tốc duy nhất.

Người phân vùng

Một trình phân vùng phân vùng các cặp khóa-giá trị của các đầu ra Bản đồ trung gian. Nó phân vùng dữ liệu bằng cách sử dụng một điều kiện do người dùng xác định, hoạt động giống như một hàm băm. Tổng số phân vùng cũng giống như số nhiệm vụ của Bộ giảm cho công việc. Chúng ta hãy lấy một ví dụ để hiểu cách hoạt động của trình phân vùng.

Triển khai phân vùng MapReduce

Để thuận tiện, chúng ta hãy giả sử chúng ta có một bảng nhỏ được gọi là Nhân viên với dữ liệu sau. Chúng tôi sẽ sử dụng dữ liệu mẫu này làm bộ dữ liệu đầu vào để chứng minh cách hoạt động của trình phân vùng.

Tôi Tên Tuổi tác Giới tính Tiền lương
1201 gopal 45 Nam giới 50.000
1202 manisha 40 Giống cái 50.000
1203 khalil 34 Nam giới 30.000
1204 prasanth 30 Nam giới 30.000
1205 kiran 20 Nam giới 40.000
1206 laxmi 25 Giống cái 35.000
1207 bhavya 20 Giống cái 15.000
1208 reshma 19 Giống cái 15.000
1209 kranthi 22 Nam giới 22.000
1210 Hài lòng 24 Nam giới 25.000
1211 Krishna 25 Nam giới 25.000
1212 Arshad 28 Nam giới 20.000
1213 lavanya 18 Giống cái 8.000

Chúng tôi phải viết một ứng dụng để xử lý tập dữ liệu đầu vào để tìm nhân viên được trả lương cao nhất theo giới tính ở các nhóm tuổi khác nhau (ví dụ: dưới 20, từ 21 đến 30, trên 30).

Dữ liệu đầu vào

Dữ liệu trên được lưu dưới dạng input.txt trong thư mục “/ home / hadoop / hadoopPartitioner” và được cung cấp dưới dạng đầu vào.

1201 gopal 45 Nam giới 50000
1202 manisha 40 Giống cái 51000
1203 khaleel 34 Nam giới 30000
1204 prasanth 30 Nam giới 31000
1205 kiran 20 Nam giới 40000
1206 laxmi 25 Giống cái 35000
1207 bhavya 20 Giống cái 15000
1208 reshma 19 Giống cái 14000
1209 kranthi 22 Nam giới 22000
1210 Hài lòng 24 Nam giới 25000
1211 Krishna 25 Nam giới 26000
1212 Arshad 28 Nam giới 20000
1213 lavanya 18 Giống cái 8000

Dựa trên đầu vào đã cho, sau đây là giải thích thuật toán của chương trình.

Nhiệm vụ bản đồ

Tác vụ bản đồ chấp nhận các cặp khóa-giá trị làm đầu vào trong khi chúng ta có dữ liệu văn bản trong tệp văn bản. Đầu vào cho nhiệm vụ bản đồ này như sau:

Input - Khóa sẽ là một mẫu chẳng hạn như “bất kỳ khóa đặc biệt nào + tên tệp + số dòng” (ví dụ: key = @ input1) và giá trị sẽ là dữ liệu trong dòng đó (ví dụ: value = 1201 \ t gopal \ t 45 \ t Nam 50000).

Method - Hoạt động của nhiệm vụ bản đồ này như sau:

  • Đọc value (dữ liệu bản ghi), xuất phát dưới dạng giá trị đầu vào từ danh sách đối số trong một chuỗi.

  • Sử dụng hàm phân tách, tách giới tính và lưu trữ trong một biến chuỗi.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Gửi thông tin giới tính và dữ liệu hồ sơ value dưới dạng cặp khóa-giá trị đầu ra từ nhiệm vụ bản đồ đến partition task.

context.write(new Text(gender), new Text(value));
  • Lặp lại tất cả các bước trên cho tất cả các bản ghi trong tệp văn bản.

Output - Bạn sẽ nhận được dữ liệu giới tính và giá trị dữ liệu bản ghi dưới dạng các cặp khóa-giá trị.

Tác vụ phân vùng

Tác vụ phân vùng chấp nhận các cặp khóa-giá trị từ tác vụ bản đồ làm đầu vào của nó. Phân vùng ngụ ý chia dữ liệu thành các phân đoạn. Theo tiêu chí có điều kiện nhất định của phân vùng, dữ liệu ghép nối khóa-giá trị đầu vào có thể được chia thành ba phần dựa trên tiêu chí độ tuổi.

Input - Toàn bộ dữ liệu trong tập hợp các cặp khóa-giá trị.

key = Giá trị trường giới tính trong bản ghi.

value = Giá trị dữ liệu bản ghi toàn bộ của giới tính đó.

Method - Quá trình logic phân vùng chạy như sau.

  • Đọc giá trị trường tuổi từ cặp khóa-giá trị đầu vào.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Kiểm tra giá trị tuổi với các điều kiện sau.

    • Tuổi nhỏ hơn hoặc bằng 20
    • Tuổi Dưới 20 và Nhỏ hơn hoặc bằng 30.
    • Tuổi Trên 30.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Toàn bộ dữ liệu của các cặp khóa-giá trị được phân đoạn thành ba bộ sưu tập các cặp khóa-giá trị. Hộp giảm tốc hoạt động riêng lẻ trên mỗi bộ sưu tập.

Giảm công việc

Số lượng tác vụ phân vùng bằng số tác vụ bộ giảm tốc. Ở đây chúng ta có ba tác vụ của bộ phân vùng và do đó chúng ta có ba tác vụ của Bộ giảm sẽ được thực thi.

Input - Trình giảm sẽ thực hiện ba lần với các bộ sưu tập các cặp khóa-giá trị khác nhau.

key = giá trị trường giới tính trong bản ghi.

value = toàn bộ dữ liệu bản ghi của giới tính đó.

Method - Logic sau đây sẽ được áp dụng trên mỗi bộ sưu tập.

  • Đọc giá trị trường Lương của mỗi bản ghi.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Kiểm tra mức lương với biến max. Nếu str [4] là mức lương tối đa, thì hãy gán str [4] là tối đa, nếu không thì bỏ qua bước.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Lặp lại các Bước 1 và 2 cho mỗi bộ sưu tập khóa (Nam & Nữ là bộ sưu tập khóa). Sau khi thực hiện ba bước này, bạn sẽ tìm thấy một mức lương tối đa từ bộ sưu tập khóa Nam và một mức lương tối đa từ bộ sưu tập khóa Nữ.

context.write(new Text(key), new IntWritable(max));

Output- Cuối cùng, bạn sẽ nhận được một bộ dữ liệu cặp khóa-giá trị trong ba bộ sưu tập của các nhóm tuổi khác nhau. Nó chứa mức lương tối đa từ bộ sưu tập Nam và mức lương tối đa từ bộ sưu tập Nữ tương ứng ở từng nhóm tuổi.

Sau khi thực hiện các tác vụ Bản đồ, Bộ phân vùng và Rút gọn, ba bộ sưu tập dữ liệu cặp khóa-giá trị được lưu trữ trong ba tệp khác nhau dưới dạng đầu ra.

Tất cả ba tác vụ được coi là công việc MapReduce. Các yêu cầu và thông số kỹ thuật sau đây của những công việc này cần được chỉ rõ trong Cấu hình -

  • Tên công việc
  • Định dạng đầu vào và đầu ra của các khóa và giá trị
  • Các lớp cá nhân cho các tác vụ Bản đồ, Rút gọn và Phân vùng
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Chương trình mẫu

Chương trình sau đây cho biết cách triển khai các trình phân vùng cho các tiêu chí đã cho trong chương trình MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Lưu mã trên dưới dạng PartitionerExample.javatrong “/ home / hadoop / hadoopPartitioner”. Việc biên dịch và thực hiện chương trình được đưa ra dưới đây.

Biên dịch và Thực hiện

Giả sử chúng tôi đang ở trong thư mục chính của người dùng Hadoop (ví dụ: / home / hadoop).

Làm theo các bước dưới đây để biên dịch và thực thi chương trình trên.

Step 1- Tải xuống Hadoop-core-1.2.1.jar, được sử dụng để biên dịch và thực thi chương trình MapReduce. Bạn có thể tải xuống jar từ mvnrepository.com .

Giả sử thư mục đã tải xuống là “/ home / hadoop / hadoopPartitioner”

Step 2 - Các lệnh sau được sử dụng để biên dịch chương trình PartitionerExample.java và tạo một jar cho chương trình.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Sử dụng lệnh sau để tạo thư mục đầu vào trong HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Sử dụng lệnh sau để sao chép tệp đầu vào có tên input.txt trong thư mục đầu vào của HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Sử dụng lệnh sau để xác minh các tệp trong thư mục đầu vào.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Sử dụng lệnh sau để chạy ứng dụng Top lương bằng cách lấy các tập tin đầu vào từ thư mục đầu vào.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Chờ một lúc cho đến khi tệp được thực thi. Sau khi thực thi, đầu ra chứa một số phân tách đầu vào, tác vụ bản đồ và tác vụ Bộ giảm tốc.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Sử dụng lệnh sau để xác minh các tệp kết quả trong thư mục đầu ra.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Bạn sẽ tìm thấy kết quả đầu ra trong ba tệp vì bạn đang sử dụng ba bộ phân vùng và ba bộ giảm bớt trong chương trình của mình.

Step 8 - Sử dụng lệnh sau để xem kết quả đầu ra trong Part-00000tập tin. Tệp này được tạo bởi HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Sử dụng lệnh sau để xem kết quả trong Part-00001 tập tin.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Sử dụng lệnh sau để xem kết quả trong Part-00002 tập tin.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000