MapReduce-Hadoopの実装

MapReduceは、コモディティハードウェアの大規模なクラスター上の大量のデータを信頼できる方法で処理するアプリケーションを作成するために使用されるフレームワークです。この章では、Javaを使用したHadoopフレームワークでのMapReduceの操作について説明します。

MapReduceアルゴリズム

一般に、MapReduceパラダイムは、実際のデータが存在するコンピューターにmap-reduceプログラムを送信することに基づいています。

  • MapReduceジョブ中に、HadoopはMapタスクとReduceタスクをクラスター内の適切なサーバーに送信します。

  • フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター周辺でのデータのコピーなど、データ受け渡しのすべての詳細を管理します。

  • ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。

  • 特定のタスクを完了した後、クラスターはデータを収集および削減して適切な結果を形成し、それをHadoopサーバーに送り返します。

入力と出力(Javaパースペクティブ)

MapReduceフレームワークは、キーと値のペアで動作します。つまり、フレームワークは、ジョブへの入力をキーと値のペアのセットとして表示し、キーと値のペアのセットを、おそらく異なるタイプのジョブの出力として生成します。

キークラスと値クラスはフレームワークによってシリアル化可能である必要があるため、書き込み可能なインターフェイスを実装する必要があります。さらに、主要なクラスは、フレームワークによる並べ替えを容易にするためにWritableComparableインターフェイスを実装する必要があります。

MapReduceジョブの入力形式と出力形式はどちらも、キーと値のペアの形式です。

(入力)<k1、v1>-> map-> <k2、v2>-> reduce-> <k3、v3>(出力)。

入力 出力
地図 <k1​​、v1> リスト(<k2、v2>)
減らす <k2、list(v2)> リスト(<k3、v3>)

MapReduceの実装

次の表は、組織の電力消費に関するデータを示しています。この表には、5年連続の月間電力消費量と年平均が含まれています。

1月 2月 3月 4月 五月 6月 7月 8月 9月 10月 11月 12月 平均
1979年 23 23 2 43 24 25 26 26 26 26 25 26 25
1980年 26 27 28 28 28 30 31 31 31 30 30 30 29
1981年 31 32 32 32 33 34 35 36 36 34 34 34 34
1984年 39 38 39 39 39 41 42 43 40 39 38 38 40
1985年 38 39 39 39 39 41 41 41 00 40 39 39 45

指定されたテーブルの入力データを処理して、最大使用年、最小使用年などを見つけるアプリケーションを作成する必要があります。このタスクは、必要な出力を生成するロジックを記述し、記述されたアプリケーションにデータを渡すだけなので、レコード数が限られているプログラマーにとっては簡単です。

ここで、入力データのスケールを上げてみましょう。特定の州のすべての大規模産業の電力消費量を分析する必要があると仮定します。このようなバルクデータを処理するアプリケーションを作成する場合、

  • それらは実行するのに多くの時間がかかります。

  • データをソースからネットワークサーバーに移動すると、ネットワークトラフィックが大量に発生します。

これらの問題を解決するために、MapReduceフレームワークがあります。

入力データ

上記のデータは次のように保存されます sample.txtそして入力として与えられます。入力ファイルは次のようになります。

1979年 23 23 2 43 24 25 26 26 26 26 25 26 25
1980年 26 27 28 28 28 30 31 31 31 30 30 30 29
1981年 31 32 32 32 33 34 35 36 36 34 34 34 34
1984年 39 38 39 39 39 41 42 43 40 39 38 38 40
1985年 38 39 39 39 39 41 41 41 00 40 39 39 45

サンプルプログラム

サンプルデータの次のプログラムは、MapReduceフレームワークを使用しています。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

上記のプログラムをに保存します ProcessUnits.java。プログラムのコンパイルと実行を以下に示します。

ProcessUnitsプログラムのコンパイルと実行

Hadoopユーザーのホームディレクトリ(例:/ home / hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

Step 1 −次のコマンドを使用して、コンパイルされたJavaクラスを格納するディレクトリを作成します。

$ mkdir units

Step 2−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。mvnrepository.comからjarファイルをダウンロードします。ダウンロードフォルダが/ home / hadoop /であると仮定します。

Step 3 −以下のコマンドを使用して、 ProcessUnits.java プログラムとプログラムのjarファイルを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 −次のコマンドを使用して、という名前の入力ファイルをコピーします。 sample.txt HDFSの入力ディレクトリにあります。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行します。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、レデューサータスクなどが含まれます。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 −次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です-

1981年 34
1984年 40
1985年 45

Step 10 −次のコマンドを使用して、出力フォルダーをHDFSからローカルファイルシステムにコピーします。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop