MapReduce-Hadoopの実装
MapReduceは、コモディティハードウェアの大規模なクラスター上の大量のデータを信頼できる方法で処理するアプリケーションを作成するために使用されるフレームワークです。この章では、Javaを使用したHadoopフレームワークでのMapReduceの操作について説明します。
MapReduceアルゴリズム
一般に、MapReduceパラダイムは、実際のデータが存在するコンピューターにmap-reduceプログラムを送信することに基づいています。
MapReduceジョブ中に、HadoopはMapタスクとReduceタスクをクラスター内の適切なサーバーに送信します。
フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター周辺でのデータのコピーなど、データ受け渡しのすべての詳細を管理します。
ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。
特定のタスクを完了した後、クラスターはデータを収集および削減して適切な結果を形成し、それをHadoopサーバーに送り返します。
入力と出力(Javaパースペクティブ)
MapReduceフレームワークは、キーと値のペアで動作します。つまり、フレームワークは、ジョブへの入力をキーと値のペアのセットとして表示し、キーと値のペアのセットを、おそらく異なるタイプのジョブの出力として生成します。
キークラスと値クラスはフレームワークによってシリアル化可能である必要があるため、書き込み可能なインターフェイスを実装する必要があります。さらに、主要なクラスは、フレームワークによる並べ替えを容易にするためにWritableComparableインターフェイスを実装する必要があります。
MapReduceジョブの入力形式と出力形式はどちらも、キーと値のペアの形式です。
(入力)<k1、v1>-> map-> <k2、v2>-> reduce-> <k3、v3>(出力)。
入力 | 出力 | |
---|---|---|
地図 | <k1、v1> | リスト(<k2、v2>) |
減らす | <k2、list(v2)> | リスト(<k3、v3>) |
MapReduceの実装
次の表は、組織の電力消費に関するデータを示しています。この表には、5年連続の月間電力消費量と年平均が含まれています。
1月 | 2月 | 3月 | 4月 | 五月 | 6月 | 7月 | 8月 | 9月 | 10月 | 11月 | 12月 | 平均 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979年 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980年 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981年 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984年 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985年 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
指定されたテーブルの入力データを処理して、最大使用年、最小使用年などを見つけるアプリケーションを作成する必要があります。このタスクは、必要な出力を生成するロジックを記述し、記述されたアプリケーションにデータを渡すだけなので、レコード数が限られているプログラマーにとっては簡単です。
ここで、入力データのスケールを上げてみましょう。特定の州のすべての大規模産業の電力消費量を分析する必要があると仮定します。このようなバルクデータを処理するアプリケーションを作成する場合、
それらは実行するのに多くの時間がかかります。
データをソースからネットワークサーバーに移動すると、ネットワークトラフィックが大量に発生します。
これらの問題を解決するために、MapReduceフレームワークがあります。
入力データ
上記のデータは次のように保存されます sample.txtそして入力として与えられます。入力ファイルは次のようになります。
1979年 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980年 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981年 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984年 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985年 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
サンプルプログラム
サンプルデータの次のプログラムは、MapReduceフレームワークを使用しています。
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
上記のプログラムをに保存します ProcessUnits.java。プログラムのコンパイルと実行を以下に示します。
ProcessUnitsプログラムのコンパイルと実行
Hadoopユーザーのホームディレクトリ(例:/ home / hadoop)にいると仮定します。
上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。
Step 1 −次のコマンドを使用して、コンパイルされたJavaクラスを格納するディレクトリを作成します。
$ mkdir units
Step 2−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。mvnrepository.comからjarファイルをダウンロードします。ダウンロードフォルダが/ home / hadoop /であると仮定します。
Step 3 −以下のコマンドを使用して、 ProcessUnits.java プログラムとプログラムのjarファイルを作成します。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 −次のコマンドを使用して、という名前の入力ファイルをコピーします。 sample.txt HDFSの入力ディレクトリにあります。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行します。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、レデューサータスクなどが含まれます。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 −次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下は、MapReduceプログラムによって生成された出力です-
1981年 | 34 |
1984年 | 40 |
1985年 | 45 |
Step 10 −次のコマンドを使用して、出力フォルダーをHDFSからローカルファイルシステムにコピーします。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop