MapReduce - พาร์ติชันเนอร์

พาร์ติชันเนอร์ทำงานเหมือนเงื่อนไขในการประมวลผลชุดข้อมูลอินพุต เฟสพาร์ติชันเกิดขึ้นหลังเฟสแผนที่และก่อนเฟสลด

จำนวนพาร์ติชันเท่ากับจำนวนตัวลด นั่นหมายความว่าพาร์ติชันเนอร์จะแบ่งข้อมูลตามจำนวนตัวลด ดังนั้นข้อมูลที่ส่งผ่านจากพาร์ติชันเดียวจะถูกประมวลผลโดยตัวลดขนาดเดียว

พาร์ทิชันเนอร์

พาร์ติชันเนอร์แบ่งพาร์ติชันคู่คีย์ - ค่าของแม็พเอาต์พุตระดับกลาง แบ่งพาร์ติชันข้อมูลโดยใช้เงื่อนไขที่ผู้ใช้กำหนดซึ่งทำงานเหมือนกับฟังก์ชันแฮช จำนวนพาร์ติชันทั้งหมดเท่ากับจำนวนงานลดสำหรับงาน ให้เรายกตัวอย่างเพื่อทำความเข้าใจวิธีการทำงานของพาร์ติชันเนอร์

การติดตั้ง MapReduce Partitioner

เพื่อความสะดวกสมมติว่าเรามีตารางขนาดเล็กที่เรียกว่าพนักงานพร้อมข้อมูลต่อไปนี้ เราจะใช้ข้อมูลตัวอย่างนี้เป็นชุดข้อมูลอินพุตของเราเพื่อสาธิตการทำงานของพาร์ติชันเนอร์

Id ชื่อ อายุ เพศ เงินเดือน
1201 โกปาล 45 ชาย 50,000
1202 มานิชา 40 หญิง 50,000
1203 คาลิล 34 ชาย 30,000
1204 พระสันต 30 ชาย 30,000
1205 คีรัน 20 ชาย 40,000
1206 ลักษมี 25 หญิง 35,000
1207 ภควา 20 หญิง 15,000
1208 reshma 19 หญิง 15,000
1209 กระท้อน 22 ชาย 22,000
1210 Satish 24 ชาย 25,000
1211 กฤษณะ 25 ชาย 25,000
1212 Arshad 28 ชาย 20,000
1213 Lavanya 18 หญิง 8,000

เราต้องเขียนแอปพลิเคชันเพื่อประมวลผลชุดข้อมูลอินพุตเพื่อค้นหาพนักงานที่ได้รับเงินเดือนสูงสุดตามเพศในกลุ่มอายุต่างๆ (เช่นอายุต่ำกว่า 20 ปีระหว่าง 21 ถึง 30 และสูงกว่า 30)

ป้อนข้อมูล

ข้อมูลข้างต้นถูกบันทึกเป็น input.txt ในไดเร็กทอรี“ / home / hadoop / hadoopPartitioner” และกำหนดให้เป็นอินพุต

1201 โกปาล 45 ชาย 50000
1202 มานิชา 40 หญิง 51000
1203 khaleel 34 ชาย 30000
1204 พระสันต 30 ชาย 31000
1205 คีรัน 20 ชาย 40000
1206 ลักษมี 25 หญิง 35000
1207 ภควา 20 หญิง 15000
1208 reshma 19 หญิง 14000
1209 กระท้อน 22 ชาย 22000
1210 Satish 24 ชาย 25000
1211 กฤษณะ 25 ชาย 26000
1212 Arshad 28 ชาย 20000
1213 Lavanya 18 หญิง 8000

ขึ้นอยู่กับอินพุตที่กำหนดต่อไปนี้เป็นคำอธิบายอัลกอริทึมของโปรแกรม

งานแผนที่

งานแผนที่ยอมรับคู่คีย์ - ค่าเป็นอินพุตในขณะที่เรามีข้อมูลข้อความในไฟล์ข้อความ อินพุตสำหรับงานแผนที่นี้มีดังนี้ -

Input - คีย์จะเป็นรูปแบบเช่น "คีย์พิเศษใด ๆ + ชื่อไฟล์ + หมายเลขบรรทัด" (ตัวอย่าง: key = @ input1) และค่าจะเป็นข้อมูลในบรรทัดนั้น (ตัวอย่าง: value = 1201 \ t gopal \ t 45 \ ชาย \ t 50000)

Method - การดำเนินงานของแผนที่นี้มีดังต่อไปนี้ -

  • อ่าน value (บันทึกข้อมูล) ซึ่งมาเป็นค่าอินพุตจากรายการอาร์กิวเมนต์ในสตริง

  • ใช้ฟังก์ชันแยกแยกเพศและจัดเก็บในตัวแปรสตริง

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • ส่งข้อมูลเพศและข้อมูลบันทึก value เป็นคู่คีย์ - ค่าเอาต์พุตจากงานแผนที่ไปยัง partition task.

context.write(new Text(gender), new Text(value));
  • ทำซ้ำขั้นตอนข้างต้นทั้งหมดสำหรับระเบียนทั้งหมดในไฟล์ข้อความ

Output - คุณจะได้รับข้อมูลเพศและค่าข้อมูลบันทึกเป็นคู่คีย์ - ค่า

งาน Partitioner

งาน partitioner ยอมรับคู่คีย์ - ค่าจากงานแผนที่เป็นอินพุต พาร์ติชันหมายถึงการแบ่งข้อมูลออกเป็นเซ็กเมนต์ ตามเกณฑ์เงื่อนไขที่กำหนดของพาร์ติชันข้อมูลที่จับคู่คีย์ - ค่าอินพุตสามารถแบ่งออกเป็นสามส่วนตามเกณฑ์อายุ

Input - ข้อมูลทั้งหมดในชุดของคู่คีย์ - ค่า

คีย์ = ค่าฟิลด์เพศในเรกคอร์ด

value = ค่าข้อมูลบันทึกทั้งหมดของเพศนั้น

Method - กระบวนการของตรรกะพาร์ติชันทำงานดังนี้

  • อ่านค่าฟิลด์อายุจากคู่คีย์ - ค่าอินพุต
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • ตรวจสอบค่าอายุด้วยเงื่อนไขต่อไปนี้

    • อายุน้อยกว่าหรือเท่ากับ 20
    • อายุมากกว่า 20 และน้อยกว่าหรือเท่ากับ 30
    • อายุมากกว่า 30 ปี
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- ข้อมูลทั้งหมดของคู่คีย์ - ค่าจะแบ่งออกเป็นสามคอลเลกชันของคู่คีย์ - ค่า Reducer ทำงานแยกกันในแต่ละคอลเลกชัน

ลดงาน

จำนวนงานพาร์ติชันเนอร์เท่ากับจำนวนงานตัวลด ที่นี่เรามีงานพาร์ติชันสามอย่างและด้วยเหตุนี้เราจึงมีงานลดสามงานที่ต้องดำเนินการ

Input - ตัวลดจะดำเนินการสามครั้งพร้อมคอลเลกชันคู่คีย์ - ค่าที่แตกต่างกัน

คีย์ = ค่าฟิลด์เพศในเรกคอร์ด

value = ข้อมูลบันทึกทั้งหมดของเพศนั้น

Method - ตรรกะต่อไปนี้จะถูกนำไปใช้กับแต่ละคอลเลกชัน

  • อ่านค่าฟิลด์เงินเดือนของแต่ละระเบียน
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • ตรวจสอบเงินเดือนด้วยตัวแปรสูงสุด ถ้า str [4] คือเงินเดือนสูงสุดให้กำหนด str [4] เป็น max มิฉะนั้นให้ข้ามขั้นตอน

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • ทำซ้ำขั้นตอนที่ 1 และ 2 สำหรับแต่ละคอลเลกชันคีย์ (ชายและหญิงเป็นคอลเลกชันหลัก) หลังจากดำเนินการสามขั้นตอนนี้แล้วคุณจะพบเงินเดือนสูงสุดหนึ่งรายการจากคอลเลกชันคีย์ชายและหนึ่งเงินเดือนสูงสุดจากคอลเลคชันคีย์หญิง

context.write(new Text(key), new IntWritable(max));

Output- สุดท้ายคุณจะได้รับชุดข้อมูลคู่คีย์ - ค่าในคอลเลกชันสามกลุ่มอายุที่แตกต่างกัน ประกอบด้วยเงินเดือนสูงสุดจากคอลเลกชันชายและเงินเดือนสูงสุดจากคอลเลกชันหญิงในแต่ละกลุ่มอายุตามลำดับ

หลังจากดำเนินการแผนที่พาร์ติชันเนอร์และงานลดแล้วคอลเลกชันของข้อมูลคู่คีย์ - ค่าทั้งสามจะถูกเก็บไว้ในไฟล์ที่แตกต่างกันสามไฟล์เป็นเอาต์พุต

งานทั้งสามจะถือว่าเป็นงาน MapReduce ข้อกำหนดและคุณสมบัติเฉพาะของงานเหล่านี้ควรระบุไว้ใน Configurations -

  • ชื่องาน
  • รูปแบบอินพุตและเอาต์พุตของคีย์และค่า
  • แต่ละคลาสสำหรับงาน Map, Reduce และ Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

ตัวอย่างโปรแกรม

โปรแกรมต่อไปนี้แสดงวิธีการใช้งานพาร์ติชันสำหรับเกณฑ์ที่กำหนดในโปรแกรม MapReduce

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

บันทึกรหัสด้านบนเป็น PartitionerExample.javaใน“ / home / hadoop / hadoopPartitioner” การรวบรวมและการดำเนินการของโปรแกรมมีดังต่อไปนี้

การรวบรวมและการดำเนินการ

สมมติว่าเราอยู่ในโฮมไดเร็กทอรีของผู้ใช้ Hadoop (ตัวอย่างเช่น / home / hadoop)

ทำตามขั้นตอนด้านล่างเพื่อคอมไพล์และรันโปรแกรมข้างต้น

Step 1- ดาวน์โหลด Hadoop-core-1.2.1.jar ซึ่งใช้ในการคอมไพล์และรันโปรแกรม MapReduce คุณสามารถดาวน์โหลดขวดจากmvnrepository.com

ให้เราถือว่าโฟลเดอร์ที่ดาวน์โหลดคือ“ / home / hadoop / hadoopPartitioner”

Step 2 - คำสั่งต่อไปนี้ใช้สำหรับการคอมไพล์โปรแกรม PartitionerExample.java และสร้าง jar สำหรับโปรแกรม

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - ใช้คำสั่งต่อไปนี้เพื่อสร้างไดเร็กทอรีอินพุตใน HDFS

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - ใช้คำสั่งต่อไปนี้เพื่อคัดลอกไฟล์อินพุตที่ชื่อ input.txt ในไดเร็กทอรีอินพุตของ HDFS

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - ใช้คำสั่งต่อไปนี้เพื่อตรวจสอบไฟล์ในไดเร็กทอรีอินพุต

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - ใช้คำสั่งต่อไปนี้เพื่อเรียกใช้แอปพลิเคชันเงินเดือนสูงสุดโดยรับไฟล์อินพุตจากไดเร็กทอรีอินพุต

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

รอสักครู่จนกว่าไฟล์จะถูกเรียกใช้งาน หลังจากดำเนินการแล้วเอาต์พุตจะมีการแยกอินพุตงานแผนที่และงานลดจำนวน

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - ใช้คำสั่งต่อไปนี้เพื่อตรวจสอบไฟล์ผลลัพธ์ในโฟลเดอร์ผลลัพธ์

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

คุณจะพบผลลัพธ์ในสามไฟล์เนื่องจากคุณใช้พาร์ติชันสามตัวและตัวลดสามตัวในโปรแกรมของคุณ

Step 8 - ใช้คำสั่งต่อไปนี้เพื่อดูผลลัพธ์ในรูปแบบ Part-00000ไฟล์. ไฟล์นี้สร้างโดย HDFS

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

ใช้คำสั่งต่อไปนี้เพื่อดูผลลัพธ์ใน Part-00001 ไฟล์.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

ใช้คำสั่งต่อไปนี้เพื่อดูผลลัพธ์ใน Part-00002 ไฟล์.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000