MapReduce-파티션 도구
파티 셔 너는 입력 데이터 세트를 처리 할 때 조건처럼 작동합니다. 파티션 단계는 매핑 단계 이후와 축소 단계 이전에 발생합니다.
파티 셔 너의 수는 리듀서의 수와 같습니다. 즉, 파티 셔 너는 리듀서의 수에 따라 데이터를 나눕니다. 따라서 단일 파티 셔너에서 전달 된 데이터는 단일 Reducer에서 처리됩니다.
파티 셔너
파티 셔 너는 중간 맵 출력의 키-값 쌍을 분할합니다. 해시 함수처럼 작동하는 사용자 정의 조건을 사용하여 데이터를 분할합니다. 총 파티션 수는 작업의 Reducer 작업 수와 동일합니다. 파티 셔 너가 어떻게 작동하는지 이해하기위한 예를 들어 보겠습니다.
MapReduce 파티션 도구 구현
편의를 위해 다음 데이터가 포함 된 Employee라는 작은 테이블이 있다고 가정하겠습니다. 이 샘플 데이터를 입력 데이터 세트로 사용하여 파티 셔 너가 작동하는 방식을 보여줍니다.
신분증 | 이름 | 나이 | 성별 | 봉급 |
---|---|---|---|---|
1201 | 고팔 | 45 | 남성 | 50,000 |
1202 | Manisha | 40 | 여자 | 50,000 |
1203 년 | 칼릴 | 34 | 남성 | 30,000 |
1204 | 프라 산스 | 30 | 남성 | 30,000 |
1205 년 | 키란 | 20 | 남성 | 40,000 |
1206 년 | Laxmi | 25 | 여자 | 35,000 |
1207 | 바비 아 | 20 | 여자 | 15,000 |
1208 년 | 레 쉬마 | 19 | 여자 | 15,000 |
1209 년 | Kranthi | 22 | 남성 | 22,000 |
1210 년 | Satish | 24 | 남성 | 25,000 |
1211 | 크리슈나 | 25 | 남성 | 25,000 |
1212 년 | 아르 샤드 | 28 | 남성 | 20,000 |
1213 년 | Lavanya | 18 | 여자 | 8,000 |
다른 연령대 (예 : 20 세 미만, 21 세 ~ 30 세, 30 세 이상)에서 성별로 가장 높은 급여를받는 직원을 찾기 위해 입력 데이터 세트를 처리하는 애플리케이션을 작성해야합니다.
입력 데이터
위의 데이터는 다음과 같이 저장됩니다. input.txt "/ home / hadoop / hadoopPartitioner"디렉토리에서 입력으로 제공됩니다.
1201 | 고팔 | 45 | 남성 | 50000 |
1202 | Manisha | 40 | 여자 | 51000 |
1203 년 | Khaleel | 34 | 남성 | 30000 |
1204 | 프라 산스 | 30 | 남성 | 31000 |
1205 년 | 키란 | 20 | 남성 | 40000 |
1206 년 | Laxmi | 25 | 여자 | 35000 |
1207 | 바비 아 | 20 | 여자 | 15000 |
1208 년 | 레 쉬마 | 19 | 여자 | 14000 |
1209 년 | Kranthi | 22 | 남성 | 22000 |
1210 년 | Satish | 24 | 남성 | 25000 |
1211 | 크리슈나 | 25 | 남성 | 26000 |
1212 년 | 아르 샤드 | 28 | 남성 | 20000 |
1213 년 | Lavanya | 18 | 여자 | 8000 |
주어진 입력에 따라 다음은 프로그램의 알고리즘 설명입니다.
작업 매핑
지도 작업은 텍스트 파일에 텍스트 데이터가있는 동안 키-값 쌍을 입력으로받습니다. 이지도 작업에 대한 입력은 다음과 같습니다.
Input − 키는 "모든 특수 키 + 파일 이름 + 줄 번호"(예 : 키 = @ input1)와 같은 패턴이고 값은 해당 줄의 데이터입니다 (예 : 값 = 1201 \ t gopal \ t 45 \ t 남성 \ t 50000).
Method −이지도 작업의 동작은 다음과 같습니다 −
읽기 value (레코드 데이터), 문자열의 인수 목록에서 입력 값으로 제공됩니다.
split 함수를 사용하여 성별을 분리하고 문자열 변수에 저장합니다.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
성별 정보 및 기록 데이터 보내기 value 매핑 작업에서 출력 키-값 쌍으로 partition task.
context.write(new Text(gender), new Text(value));
텍스트 파일의 모든 레코드에 대해 위의 모든 단계를 반복합니다.
Output − 성별 데이터와 레코드 데이터 값을 키-값 쌍으로 가져옵니다.
파티 셔너 작업
파티 셔너 태스크는 맵 태스크의 키-값 쌍을 입력으로받습니다. 파티션은 데이터를 세그먼트로 나누는 것을 의미합니다. 주어진 파티션 조건부 기준에 따라 입력 키-값 쌍 데이터는 연령 기준에 따라 세 부분으로 나눌 수 있습니다.
Input − 키-값 쌍 모음의 전체 데이터.
키 = 레코드의 성별 필드 값.
value = 해당 성별의 전체 레코드 데이터 값.
Method − 파티션 로직의 과정은 다음과 같습니다.
- 입력 키-값 쌍에서 연령 필드 값을 읽습니다.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
다음 조건으로 연령 값을 확인하십시오.
- 20 세 이하
- 연령 20 세 이상 30 세 이하.
- 30 세 이상.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output− 키-값 쌍의 전체 데이터는 키-값 쌍의 세 가지 모음으로 나뉩니다. Reducer는 각 컬렉션에서 개별적으로 작동합니다.
작업 감소
파티 셔너 작업의 수는 리듀서 작업의 수와 같습니다. 여기에는 세 개의 파티 셔너 작업이 있으므로 실행할 세 개의 Reducer 작업이 있습니다.
Input − Reducer는 서로 다른 키-값 쌍 모음으로 세 번 실행됩니다.
키 = 레코드의 성별 필드 값.
값 = 해당 성별의 전체 기록 데이터.
Method − 각 컬렉션에 다음 로직이 적용됩니다.
- 각 레코드의 급여 필드 값을 읽습니다.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
max 변수로 급여를 확인하십시오. str [4]가 최대 급여이면 str [4]를 max에 할당하고 그렇지 않으면 단계를 건너 뜁니다.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
각 키 컬렉션에 대해 1 단계와 2 단계를 반복합니다 (남성 및 여성이 키 컬렉션 임). 이 세 단계를 실행하면 남성 키 컬렉션에서 최대 급여 하나와 여성 키 컬렉션에서 최대 급여 하나를 찾을 수 있습니다.
context.write(new Text(key), new IntWritable(max));
Output− 마지막으로 서로 다른 연령 그룹의 세 가지 컬렉션에서 키-값 쌍 데이터 세트를 얻습니다. 여기에는 각 연령 그룹의 남성 컬렉션의 최대 급여와 여성 컬렉션의 최대 급여가 포함됩니다.
Map, Partitioner 및 Reduce 작업을 실행 한 후 세 가지 키-값 쌍 데이터 컬렉션이 출력으로 세 개의 다른 파일에 저장됩니다.
세 가지 작업은 모두 MapReduce 작업으로 처리됩니다. 이러한 작업의 다음 요구 사항 및 사양은 구성에 지정되어야합니다.
- 직업 이름
- 키와 값의 입력 및 출력 형식
- Map, Reduce 및 Partitioner 작업을위한 개별 클래스
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
예제 프로그램
다음 프로그램은 MapReduce 프로그램에서 주어진 기준에 대한 파티 셔 너를 구현하는 방법을 보여줍니다.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
위의 코드를 다른 이름으로 저장하십시오. PartitionerExample.java"/ home / hadoop / hadoopPartitioner"에 있습니다. 프로그램의 컴파일 및 실행은 다음과 같습니다.
컴파일 및 실행
Hadoop 사용자의 홈 디렉토리 (예 : / home / hadoop)에 있다고 가정 해 보겠습니다.
위의 프로그램을 컴파일하고 실행하려면 아래 단계를 따르십시오.
Step 1− MapReduce 프로그램을 컴파일하고 실행하는 데 사용되는 Hadoop-core-1.2.1.jar를 다운로드합니다. mvnrepository.com 에서 jar를 다운로드 할 수 있습니다 .
다운로드 한 폴더가“/ home / hadoop / hadoopPartitioner”라고 가정하겠습니다.
Step 2 − 다음 명령은 프로그램 컴파일에 사용됩니다. PartitionerExample.java 프로그램을위한 jar를 생성합니다.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 − 다음 명령을 사용하여 HDFS에서 입력 디렉토리를 생성합니다.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 − 다음 명령을 사용하여 이름이 지정된 입력 파일을 복사합니다. input.txt HDFS의 입력 디렉토리에 있습니다.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 − 다음 명령을 사용하여 입력 디렉토리의 파일을 확인합니다.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 − 다음 명령을 사용하여 입력 디렉토리에서 입력 파일을 가져와 최고 급여 응용 프로그램을 실행합니다.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
파일이 실행될 때까지 잠시 기다리십시오. 실행 후 출력에는 여러 입력 분할, 매핑 작업 및 Reducer 작업이 포함됩니다.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 − 다음 명령을 사용하여 출력 폴더에서 결과 파일을 확인합니다.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
프로그램에서 3 개의 파티 셔 너와 3 개의 Reducer를 사용하고 있기 때문에 3 개의 파일에서 출력을 찾을 수 있습니다.
Step 8 − 다음 명령을 사용하여 출력을 확인하십시오. Part-00000파일. 이 파일은 HDFS에서 생성됩니다.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
다음 명령을 사용하여 출력을 확인하십시오. Part-00001 파일.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
다음 명령을 사용하여 출력을 확인하십시오. Part-00002 파일.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000