MapReduce-パーティショナー
パーティショナーは、入力データセットを処理する際の条件のように機能します。パーティションフェーズは、マップフェーズの後、リデュースフェーズの前に実行されます。
パーティショナーの数は、レデューサーの数と同じです。つまり、パーティショナーはレデューサーの数に応じてデータを分割します。したがって、単一のパーティショナーから渡されたデータは、単一のレデューサーによって処理されます。
パーティショナー
パーティショナーは、中間Map出力のキーと値のペアをパーティション化します。ハッシュ関数のように機能するユーザー定義の条件を使用してデータを分割します。パーティションの総数は、ジョブのレデューサータスクの数と同じです。パーティショナーがどのように機能するかを理解するために例を見てみましょう。
MapReduceパーティショナーの実装
便宜上、次のデータを持つEmployeeという小さなテーブルがあると仮定します。このサンプルデータを入力データセットとして使用して、パーティショナーがどのように機能するかを示します。
Id | 名前 | 年齢 | 性別 | 給料 |
---|---|---|---|---|
1201 | ゴパル | 45 | 男性 | 50,000 |
1202 | マニシャ | 40 | 女性 | 50,000 |
1203 | カリル | 34 | 男性 | 30,000 |
1204 | プラシャーント | 30 | 男性 | 30,000 |
1205 | キラン | 20 | 男性 | 40,000 |
1206 | ラクシュミ | 25 | 女性 | 35,000 |
1207 | bhavya | 20 | 女性 | 15,000 |
1208 | レシュマ | 19 | 女性 | 15,000 |
1209 | クランティ | 22 | 男性 | 22,000 |
1210 | サティッシュ | 24 | 男性 | 25,000 |
1211 | クリシュナ | 25 | 男性 | 25,000 |
1212 | アーシャッド | 28 | 男性 | 20,000 |
1213 | ラヴァニャ | 18 | 女性 | 8,000 |
入力データセットを処理して、さまざまな年齢層(たとえば、20歳未満、21歳から30歳、30歳以上)の性別で最も給与の高い従業員を見つけるアプリケーションを作成する必要があります。
入力データ
上記のデータは次のように保存されます input.txt 「/ home / hadoop / hadoopPartitioner」ディレクトリにあり、入力として指定されます。
1201 | ゴパル | 45 | 男性 | 50000 |
1202 | マニシャ | 40 | 女性 | 51000 |
1203 | khaleel | 34 | 男性 | 30000 |
1204 | プラシャーント | 30 | 男性 | 31000 |
1205 | キラン | 20 | 男性 | 40000 |
1206 | ラクシュミ | 25 | 女性 | 35000 |
1207 | bhavya | 20 | 女性 | 15000 |
1208 | レシュマ | 19 | 女性 | 14000 |
1209 | クランティ | 22 | 男性 | 22000 |
1210 | サティッシュ | 24 | 男性 | 25000 |
1211 | クリシュナ | 25 | 男性 | 26000 |
1212 | アーシャッド | 28 | 男性 | 20000 |
1213 | ラヴァニャ | 18 | 女性 | 8000 |
与えられた入力に基づいて、以下はプログラムのアルゴリズムの説明です。
マップタスク
マップタスクは、テキストファイルにテキストデータがある間、入力としてキーと値のペアを受け入れます。このマップタスクの入力は次のとおりです-
Input −キーは「任意の特別なキー+ファイル名+行番号」(例:key = @ input1)などのパターンであり、値はその行のデータ(例:value = 1201 \ t gopal \ t 45 \ t男性\ t 50000)。
Method −このマップタスクの操作は次のとおりです。
読む value (レコードデータ)。これは、文字列の引数リストからの入力値として提供されます。
split関数を使用して、性別を分離し、文字列変数に格納します。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
性別情報と記録データを送信する value マップタスクからの出力キーと値のペアとして partition task。
context.write(new Text(gender), new Text(value));
テキストファイル内のすべてのレコードについて、上記のすべての手順を繰り返します。
Output −性別データとレコードデータ値をキーと値のペアとして取得します。
パーティショナータスク
パーティショナータスクは、マップタスクからのキーと値のペアを入力として受け入れます。パーティションとは、データをセグメントに分割することを意味します。パーティションの指定された条件付き基準に従って、入力されたキーと値のペアのデータは、年齢基準に基づいて3つの部分に分割できます。
Input −キーと値のペアのコレクション内のデータ全体。
key =レコードの性別フィールド値。
value =その性別の全レコードデータ値。
Method −パーティションロジックのプロセスは次のように実行されます。
- 入力キーと値のペアから年齢フィールドの値を読み取ります。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
以下の条件で年齢値を確認してください。
- 20歳以下の年齢
- 20歳以上30歳以下の年齢。
- 30歳以上。
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output−キーと値のペアのデータ全体は、キーと値のペアの3つのコレクションに分割されます。レデューサーは、各コレクションで個別に機能します。
タスクを減らす
パーティショナータスクの数は、レデューサータスクの数と同じです。ここでは、3つのパーティショナータスクがあるため、実行する3つのレデューサータスクがあります。
Input −レデューサーは、キーと値のペアの異なるコレクションで3回実行されます。
key =レコード内の性別フィールド値。
値=その性別のレコードデータ全体。
Method −次のロジックが各コレクションに適用されます。
- 各レコードのSalaryフィールド値を読み取ります。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
max変数で給与を確認してください。str [4]が最大給与の場合は、str [4]をmaxに割り当てます。それ以外の場合は、手順をスキップします。
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
キーコレクションごとに手順1と2を繰り返します(男性と女性がキーコレクションです)。これらの3つの手順を実行すると、男性のキーコレクションから最大給与が1つ、女性のキーコレクションから最大給与が1つ見つかります。
context.write(new Text(key), new IntWritable(max));
Output−最後に、異なる年齢層の3つのコレクションのKey-Valueペアデータのセットを取得します。これには、各年齢層の男性コレクションの最高給与と女性コレクションの最高給与がそれぞれ含まれています。
Map、Partitioner、およびReduceタスクを実行した後、Key-Valueペアデータの3つのコレクションは、出力として3つの異なるファイルに保存されます。
3つのタスクはすべてMapReduceジョブとして扱われます。これらのジョブの次の要件と仕様は、構成で指定する必要があります-
- 職種名
- キーと値の入力および出力形式
- Map、Reduce、Partitionerタスクの個々のクラス
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
サンプルプログラム
次のプログラムは、MapReduceプログラムで特定の基準のパーティショナーを実装する方法を示しています。
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
上記のコードを次のように保存します PartitionerExample.java「/ home / hadoop / hadoopPartitioner」にあります。プログラムのコンパイルと実行を以下に示します。
コンパイルと実行
Hadoopユーザーのホームディレクトリ(たとえば、/ home / hadoop)にいると仮定します。
上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。
Step 1−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。あなたはmvnrepository.comからjarファイルをダウンロードすることができます。
ダウンロードしたフォルダーが「/ home / hadoop / hadoopPartitioner」であると仮定します。
Step 2 −プログラムのコンパイルには以下のコマンドを使用します PartitionerExample.java プログラムのjarファイルを作成します。
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 −次のコマンドを使用して、という名前の入力ファイルをコピーします input.txt HDFSの入力ディレクトリにあります。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得して、トップ給与アプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 −次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
プログラムで3つのパーティショナーと3つのレデューサーを使用しているため、出力は3つのファイルにあります。
Step 8 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
次のコマンドを使用して、の出力を確認します。 Part-00001 ファイル。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
次のコマンドを使用して、の出力を確認します。 Part-00002 ファイル。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000