MapReduce-コンバイナー
コンバイナー、別名 semi-reducer, は、Mapクラスからの入力を受け入れ、その後、出力キーと値のペアをReducerクラスに渡すことによって動作するオプションのクラスです。
コンバイナーの主な機能は、同じキーでマップ出力レコードを要約することです。コンバイナーの出力(Key-Valueコレクション)は、ネットワークを介して実際のレデューサータスクに入力として送信されます。
コンバイナー
Combinerクラスは、MapクラスとReduceクラスの間で使用され、MapとReduceの間のデータ転送の量を減らします。通常、mapタスクの出力は大きく、reduceタスクに転送されるデータは高くなります。
次のMapReduceタスク図は、COMBINERPHASEを示しています。
コンバイナーはどのように機能しますか?
MapReduceコンバイナーがどのように機能するかについての簡単な要約は次のとおりです-
コンバイナには事前定義されたインターフェイスがなく、Reducerインターフェイスのreduce()メソッドを実装する必要があります。
コンバイナは、各マップ出力キーで動作します。これは、Reducerクラスと同じ出力Key-Valueタイプを持っている必要があります。
コンバイナーは、元のMap出力を置き換えるため、大きなデータセットから要約情報を生成できます。
ただし、Combinerはオプションですが、Reduceフェーズでデータを複数のグループに分離するのに役立ち、処理が容易になります。
MapReduceコンバイナーの実装
次の例は、コンバイナに関する理論的な考え方を示しています。次の名前の入力テキストファイルがあると仮定します。input.txt MapReduceの場合。
What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance
Combinerを使用したMapReduceプログラムの重要なフェーズについて以下で説明します。
レコードリーダー
これはMapReduceの最初のフェーズであり、レコードリーダーは入力テキストファイルからすべての行をテキストとして読み取り、キーと値のペアとして出力を生成します。
Input −入力ファイルからの行ごとのテキスト。
Output−キーと値のペアを形成します。以下は、予想されるキーと値のペアのセットです。
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
マップフェーズ
マップフェーズは、レコードリーダーから入力を受け取り、それを処理して、キーと値のペアの別のセットとして出力を生成します。
Input −次のキーと値のペアは、レコードリーダーから取得した入力です。
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
マップフェーズでは、各キーと値のペアを読み取り、StringTokenizerを使用して各単語を値から分割し、各単語をキーとして扱い、その単語の数を値として扱います。次のコードスニペットは、Mapperクラスとmap関数を示しています。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Output −期待される出力は次のとおりです−
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
コンバイナーフェーズ
コンバイナーフェーズは、マップフェーズから各キーと値のペアを取得して処理し、次のように出力を生成します。 key-value collection ペア。
Input −次のキーと値のペアは、マップフェーズから取得した入力です。
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
コンバイナーフェーズでは、各キーと値のペアを読み取り、一般的な単語をキーとして結合し、値をコレクションとして結合します。通常、コンバイナーのコードと操作はレデューサーのコードと操作に似ています。以下は、Mapper、Combiner、Reducerクラス宣言のコードスニペットです。
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
Output −期待される出力は次のとおりです−
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
レデューサーフェーズ
レデューサーフェーズは、コンバイナーフェーズから各キーと値の収集ペアを取得して処理し、出力をキーと値のペアとして渡します。コンバイナの機能はレデューサと同じであることに注意してください。
Input −次のキーと値のペアは、コンバイナーフェーズから取得した入力です。
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
レデューサーフェーズは、各キーと値のペアを読み取ります。以下は、コンバイナーのコードスニペットです。
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Output −レデューサーフェーズからの期待される出力は次のとおりです。
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
レコードライター
これはMapReduceの最後のフェーズであり、レコードライターはReducerフェーズからすべてのキーと値のペアを書き込み、出力をテキストとして送信します。
Input −出力形式とともにReducerフェーズからの各Key-Valueペア。
Output−キーと値のペアをテキスト形式で提供します。以下は期待される出力です。
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1
サンプルプログラム
次のコードブロックは、プログラム内の単語数をカウントします。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
上記のプログラムを次のように保存します WordCount.java。プログラムのコンパイルと実行を以下に示します。
コンパイルと実行
Hadoopユーザーのホームディレクトリ(たとえば、/ home / hadoop)にいると仮定します。
上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。
Step 1 −次のコマンドを使用して、コンパイルされたJavaクラスを格納するディレクトリを作成します。
$ mkdir units
Step 2−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。あなたはmvnrepository.comからjarファイルをダウンロードすることができます。
ダウンロードしたフォルダーが/ home / hadoop /であると仮定します。
Step 3 −次のコマンドを使用して、 WordCount.java プログラムとプログラムのjarファイルを作成します。
$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .
Step 4 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 −次のコマンドを使用して、という名前の入力ファイルをコピーします input.txt HDFSの入力ディレクトリにあります。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir
Step 6 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してワードカウントアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。
Step 8 −次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下は、MapReduceプログラムによって生成された出力です。
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1