MapReduce-パーティショナー

パーティショナーは、入力データセットを処理する際の条件のように機能します。パーティションフェーズは、マップフェーズの後、リデュースフェーズの前に実行されます。

パーティショナーの数は、レデューサーの数と同じです。つまり、パーティショナーはレデューサーの数に応じてデータを分割します。したがって、単一のパーティショナーから渡されたデータは、単一のレデューサーによって処理されます。

パーティショナー

パーティショナーは、中間Map出力のキーと値のペアをパーティション化します。ハッシュ関数のように機能するユーザー定義の条件を使用してデータを分割します。パーティションの総数は、ジョブのレデューサータスクの数と同じです。パーティショナーがどのように機能するかを理解するために例を見てみましょう。

MapReduceパーティショナーの実装

便宜上、次のデータを持つEmployeeという小さなテーブルがあると仮定します。このサンプルデータを入力データセットとして使用して、パーティショナーがどのように機能するかを示します。

Id 名前 年齢 性別 給料
1201 ゴパル 45 男性 50,000
1202 マニシャ 40 女性 50,000
1203 カリル 34 男性 30,000
1204 プラシャーント 30 男性 30,000
1205 キラン 20 男性 40,000
1206 ラクシュミ 25 女性 35,000
1207 bhavya 20 女性 15,000
1208 レシュマ 19 女性 15,000
1209 クランティ 22 男性 22,000
1210 サティッシュ 24 男性 25,000
1211 クリシュナ 25 男性 25,000
1212 アーシャッド 28 男性 20,000
1213 ラヴァニャ 18 女性 8,000

入力データセットを処理して、さまざまな年齢層(たとえば、20歳未満、21歳から30歳、30歳以上)の性別で最も給与の高い従業員を見つけるアプリケーションを作成する必要があります。

入力データ

上記のデータは次のように保存されます input.txt 「/ home / hadoop / hadoopPartitioner」ディレクトリにあり、入力として指定されます。

1201 ゴパル 45 男性 50000
1202 マニシャ 40 女性 51000
1203 khaleel 34 男性 30000
1204 プラシャーント 30 男性 31000
1205 キラン 20 男性 40000
1206 ラクシュミ 25 女性 35000
1207 bhavya 20 女性 15000
1208 レシュマ 19 女性 14000
1209 クランティ 22 男性 22000
1210 サティッシュ 24 男性 25000
1211 クリシュナ 25 男性 26000
1212 アーシャッド 28 男性 20000
1213 ラヴァニャ 18 女性 8000

与えられた入力に基づいて、以下はプログラムのアルゴリズムの説明です。

マップタスク

マップタスクは、テキストファイルにテキストデータがある間、入力としてキーと値のペアを受け入れます。このマップタスクの入力は次のとおりです-

Input −キーは「任意の特別なキー+ファイル名+行番号」(例:key = @ input1)などのパターンであり、値はその行のデータ(例:value = 1201 \ t gopal \ t 45 \ t男性\ t 50000)。

Method −このマップタスクの操作は次のとおりです。

  • 読む value (レコードデータ)。これは、文字列の引数リストからの入力値として提供されます。

  • split関数を使用して、性別を分離し、文字列変数に格納します。

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 性別情報と記録データを送信する value マップタスクからの出力キーと値のペアとして partition task

context.write(new Text(gender), new Text(value));
  • テキストファイル内のすべてのレコードについて、上記のすべての手順を繰り返します。

Output −性別データとレコードデータ値をキーと値のペアとして取得します。

パーティショナータスク

パーティショナータスクは、マップタスクからのキーと値のペアを入力として受け入れます。パーティションとは、データをセグメントに分割することを意味します。パーティションの指定された条件付き基準に従って、入力されたキーと値のペアのデータは、年齢基準に基づいて3つの部分に分割できます。

Input −キーと値のペアのコレクション内のデータ全体。

key =レコードの性別フィールド値。

value =その性別の全レコードデータ値。

Method −パーティションロジックのプロセスは次のように実行されます。

  • 入力キーと値のペアから年齢フィールドの値を読み取ります。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 以下の条件で年齢値を確認してください。

    • 20歳以下の年齢
    • 20歳以上30歳以下の年齢。
    • 30歳以上。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output−キーと値のペアのデータ全体は、キーと値のペアの3つのコレクションに分割されます。レデューサーは、各コレクションで個別に機能します。

タスクを減らす

パーティショナータスクの数は、レデューサータスクの数と同じです。ここでは、3つのパーティショナータスクがあるため、実行する3つのレデューサータスクがあります。

Input −レデューサーは、キーと値のペアの異なるコレクションで3回実行されます。

key =レコード内の性別フィールド値。

値=その性別のレコードデータ全体。

Method −次のロジックが各コレクションに適用されます。

  • 各レコードのSalaryフィールド値を読み取ります。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • max変数で給与を確認してください。str [4]が最大給与の場合は、str [4]をmaxに割り当てます。それ以外の場合は、手順をスキップします。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • キーコレクションごとに手順1と2を繰り返します(男性と女性がキーコレクションです)。これらの3つの手順を実行すると、男性のキーコレクションから最大給与が1つ、女性のキーコレクションから最大給与が1つ見つかります。

context.write(new Text(key), new IntWritable(max));

Output−最後に、異なる年齢層の3つのコレクションのKey-Valueペアデータのセットを取得します。これには、各年齢層の男性コレクションの最高給与と女性コレクションの最高給与がそれぞれ含まれています。

Map、Partitioner、およびReduceタスクを実行した後、Key-Valueペアデータの3つのコレクションは、出力として3つの異なるファイルに保存されます。

3つのタスクはすべてMapReduceジョブとして扱われます。これらのジョブの次の要件と仕様は、構成で指定する必要があります-

  • 職種名
  • キーと値の入力および出力形式
  • Map、Reduce、Partitionerタスクの個々のクラス
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

サンプルプログラム

次のプログラムは、MapReduceプログラムで特定の基準のパーティショナーを実装する方法を示しています。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

上記のコードを次のように保存します PartitionerExample.java「/ home / hadoop / hadoopPartitioner」にあります。プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/ home / hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

Step 1−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。あなたはmvnrepository.comからjarファイルをダウンロードすることができます。

ダウンロードしたフォルダーが「/ home / hadoop / hadoopPartitioner」であると仮定します。

Step 2 −プログラムのコンパイルには以下のコマンドを使用します PartitionerExample.java プログラムのjarファイルを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 −次のコマンドを使用して、という名前の入力ファイルをコピーします input.txt HDFSの入力ディレクトリにあります。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得して、トップ給与アプリケーションを実行します。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 −次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

プログラムで3つのパーティショナーと3つのレデューサーを使用しているため、出力は3つのファイルにあります。

Step 8 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

次のコマンドを使用して、の出力を確認します。 Part-00001 ファイル。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

次のコマンドを使用して、の出力を確認します。 Part-00002 ファイル。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000