MapReduce-파티션 도구

파티 셔 너는 입력 데이터 세트를 처리 할 때 조건처럼 작동합니다. 파티션 단계는 매핑 단계 이후와 축소 단계 이전에 발생합니다.

파티 셔 너의 수는 리듀서의 수와 같습니다. 즉, 파티 셔 너는 리듀서의 수에 따라 데이터를 나눕니다. 따라서 단일 파티 셔너에서 전달 된 데이터는 단일 Reducer에서 처리됩니다.

파티 셔너

파티 셔 너는 중간 맵 출력의 키-값 쌍을 분할합니다. 해시 함수처럼 작동하는 사용자 정의 조건을 사용하여 데이터를 분할합니다. 총 파티션 수는 작업의 Reducer 작업 수와 동일합니다. 파티 셔 너가 어떻게 작동하는지 이해하기위한 예를 들어 보겠습니다.

MapReduce 파티션 도구 구현

편의를 위해 다음 데이터가 포함 된 Employee라는 작은 테이블이 있다고 가정하겠습니다. 이 샘플 데이터를 입력 데이터 세트로 사용하여 파티 셔 너가 작동하는 방식을 보여줍니다.

신분증 이름 나이 성별 봉급
1201 고팔 45 남성 50,000
1202 Manisha 40 여자 50,000
1203 년 칼릴 34 남성 30,000
1204 프라 산스 30 남성 30,000
1205 년 키란 20 남성 40,000
1206 년 Laxmi 25 여자 35,000
1207 바비 아 20 여자 15,000
1208 년 레 쉬마 19 여자 15,000
1209 년 Kranthi 22 남성 22,000
1210 년 Satish 24 남성 25,000
1211 크리슈나 25 남성 25,000
1212 년 아르 샤드 28 남성 20,000
1213 년 Lavanya 18 여자 8,000

다른 연령대 (예 : 20 세 미만, 21 세 ~ 30 세, 30 세 이상)에서 성별로 가장 높은 급여를받는 직원을 찾기 위해 입력 데이터 세트를 처리하는 애플리케이션을 작성해야합니다.

입력 데이터

위의 데이터는 다음과 같이 저장됩니다. input.txt "/ home / hadoop / hadoopPartitioner"디렉토리에서 입력으로 제공됩니다.

1201 고팔 45 남성 50000
1202 Manisha 40 여자 51000
1203 년 Khaleel 34 남성 30000
1204 프라 산스 30 남성 31000
1205 년 키란 20 남성 40000
1206 년 Laxmi 25 여자 35000
1207 바비 아 20 여자 15000
1208 년 레 쉬마 19 여자 14000
1209 년 Kranthi 22 남성 22000
1210 년 Satish 24 남성 25000
1211 크리슈나 25 남성 26000
1212 년 아르 샤드 28 남성 20000
1213 년 Lavanya 18 여자 8000

주어진 입력에 따라 다음은 프로그램의 알고리즘 설명입니다.

작업 매핑

지도 작업은 텍스트 파일에 텍스트 데이터가있는 동안 키-값 쌍을 입력으로받습니다. 이지도 작업에 대한 입력은 다음과 같습니다.

Input − 키는 "모든 특수 키 + 파일 이름 + 줄 번호"(예 : 키 = @ input1)와 같은 패턴이고 값은 해당 줄의 데이터입니다 (예 : 값 = 1201 \ t gopal \ t 45 \ t 남성 \ t 50000).

Method −이지도 작업의 동작은 다음과 같습니다 −

  • 읽기 value (레코드 데이터), 문자열의 인수 목록에서 입력 값으로 제공됩니다.

  • split 함수를 사용하여 성별을 분리하고 문자열 변수에 저장합니다.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 성별 정보 및 기록 데이터 보내기 value 매핑 작업에서 출력 키-값 쌍으로 partition task.

context.write(new Text(gender), new Text(value));
  • 텍스트 파일의 모든 레코드에 대해 위의 모든 단계를 반복합니다.

Output − 성별 데이터와 레코드 데이터 값을 키-값 쌍으로 가져옵니다.

파티 셔너 작업

파티 셔너 태스크는 맵 태스크의 키-값 쌍을 입력으로받습니다. 파티션은 데이터를 세그먼트로 나누는 것을 의미합니다. 주어진 파티션 조건부 기준에 따라 입력 키-값 쌍 데이터는 연령 기준에 따라 세 부분으로 나눌 수 있습니다.

Input − 키-값 쌍 모음의 전체 데이터.

키 = 레코드의 성별 필드 값.

value = 해당 성별의 전체 레코드 데이터 값.

Method − 파티션 로직의 과정은 다음과 같습니다.

  • 입력 키-값 쌍에서 연령 필드 값을 읽습니다.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 다음 조건으로 연령 값을 확인하십시오.

    • 20 세 이하
    • 연령 20 세 이상 30 세 이하.
    • 30 세 이상.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output− 키-값 쌍의 전체 데이터는 키-값 쌍의 세 가지 모음으로 나뉩니다. Reducer는 각 컬렉션에서 개별적으로 작동합니다.

작업 감소

파티 셔너 작업의 수는 리듀서 작업의 수와 같습니다. 여기에는 세 개의 파티 셔너 작업이 있으므로 실행할 세 개의 Reducer 작업이 있습니다.

Input − Reducer는 서로 다른 키-값 쌍 모음으로 세 번 실행됩니다.

키 = 레코드의 성별 필드 값.

값 = 해당 성별의 전체 기록 데이터.

Method − 각 컬렉션에 다음 로직이 적용됩니다.

  • 각 레코드의 급여 필드 값을 읽습니다.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • max 변수로 급여를 확인하십시오. str [4]가 최대 급여이면 str [4]를 max에 할당하고 그렇지 않으면 단계를 건너 뜁니다.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • 각 키 컬렉션에 대해 1 단계와 2 단계를 반복합니다 (남성 및 여성이 키 컬렉션 임). 이 세 단계를 실행하면 남성 키 컬렉션에서 최대 급여 하나와 여성 키 컬렉션에서 최대 급여 하나를 찾을 수 있습니다.

context.write(new Text(key), new IntWritable(max));

Output− 마지막으로 서로 다른 연령 그룹의 세 가지 컬렉션에서 키-값 쌍 데이터 세트를 얻습니다. 여기에는 각 연령 그룹의 남성 컬렉션의 최대 급여와 여성 컬렉션의 최대 급여가 포함됩니다.

Map, Partitioner 및 Reduce 작업을 실행 한 후 세 가지 키-값 쌍 데이터 컬렉션이 출력으로 세 개의 다른 파일에 저장됩니다.

세 가지 작업은 모두 MapReduce 작업으로 처리됩니다. 이러한 작업의 다음 요구 사항 및 사양은 구성에 지정되어야합니다.

  • 직업 이름
  • 키와 값의 입력 및 출력 형식
  • Map, Reduce 및 Partitioner 작업을위한 개별 클래스
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

예제 프로그램

다음 프로그램은 MapReduce 프로그램에서 주어진 기준에 대한 파티 셔 너를 구현하는 방법을 보여줍니다.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

위의 코드를 다른 이름으로 저장하십시오. PartitionerExample.java"/ home / hadoop / hadoopPartitioner"에 있습니다. 프로그램의 컴파일 및 실행은 다음과 같습니다.

컴파일 및 실행

Hadoop 사용자의 홈 디렉토리 (예 : / home / hadoop)에 있다고 가정 해 보겠습니다.

위의 프로그램을 컴파일하고 실행하려면 아래 단계를 따르십시오.

Step 1− MapReduce 프로그램을 컴파일하고 실행하는 데 사용되는 Hadoop-core-1.2.1.jar를 다운로드합니다. mvnrepository.com 에서 jar를 다운로드 할 수 있습니다 .

다운로드 한 폴더가“/ home / hadoop / hadoopPartitioner”라고 가정하겠습니다.

Step 2 − 다음 명령은 프로그램 컴파일에 사용됩니다. PartitionerExample.java 프로그램을위한 jar를 생성합니다.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 − 다음 명령을 사용하여 HDFS에서 입력 디렉토리를 생성합니다.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 − 다음 명령을 사용하여 이름이 지정된 입력 파일을 복사합니다. input.txt HDFS의 입력 디렉토리에 있습니다.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 − 다음 명령을 사용하여 입력 디렉토리의 파일을 확인합니다.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 − 다음 명령을 사용하여 입력 디렉토리에서 입력 파일을 가져와 최고 급여 응용 프로그램을 실행합니다.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

파일이 실행될 때까지 잠시 기다리십시오. 실행 후 출력에는 여러 입력 분할, 매핑 작업 및 Reducer 작업이 포함됩니다.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 − 다음 명령을 사용하여 출력 폴더에서 결과 파일을 확인합니다.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

프로그램에서 3 개의 파티 셔 너와 3 개의 Reducer를 사용하고 있기 때문에 3 개의 파일에서 출력을 찾을 수 있습니다.

Step 8 − 다음 명령을 사용하여 출력을 확인하십시오. Part-00000파일. 이 파일은 HDFS에서 생성됩니다.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

다음 명령을 사용하여 출력을 확인하십시오. Part-00001 파일.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

다음 명령을 사용하여 출력을 확인하십시오. Part-00002 파일.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000