Hadoop-MapReduce
MapReduceは、コモディティハードウェアの大規模なクラスター上で、信頼性の高い方法で大量のデータを並行して処理するアプリケーションを作成するために使用できるフレームワークです。
MapReduceとは何ですか?
MapReduceは、Javaに基づく分散コンピューティングのための処理技術およびプログラムモデルです。MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。Mapはデータのセットを取得し、それを別のデータのセットに変換します。ここで、個々の要素はタプル(キーと値のペア)に分割されます。次に、タスクを減らします。これは、マップからの出力を入力として受け取り、それらのデータタプルをより小さなタプルのセットに結合します。MapReduceという名前のシーケンスが示すように、reduceタスクは常にマップジョブの後に実行されます。
MapReduceの主な利点は、複数のコンピューティングノード間でデータ処理を簡単にスケーリングできることです。MapReduceモデルでは、データ処理プリミティブはマッパーおよびリデューサーと呼ばれます。データ処理アプリケーションをマッパーとリデューサーに分解することは、簡単ではない場合があります。ただし、MapReduceフォームでアプリケーションを作成すると、クラスター内で数百、数千、さらには数万のマシンを実行するようにアプリケーションをスケーリングするだけで、構成を変更するだけです。この単純なスケーラビリティは、多くのプログラマーがMapReduceモデルを使用するように惹きつけてきたものです。
アルゴリズム
一般に、MapReduceパラダイムは、データが存在する場所にコンピューターを送信することに基づいています。
MapReduceプログラムは、マップステージ、シャッフルステージ、リデュースステージの3つのステージで実行されます。
Map stage−マップまたはマッパーの仕事は、入力データを処理することです。通常、入力データはファイルまたはディレクトリの形式であり、Hadoopファイルシステム(HDFS)に保存されます。入力ファイルは、マッパー関数に1行ずつ渡されます。マッパーはデータを処理し、データのいくつかの小さなチャンクを作成します。
Reduce stage −この段階は、 Shuffle ステージと Reduceステージ。レデューサーの仕事は、マッパーからのデータを処理することです。処理後、新しい出力セットが生成され、HDFSに保存されます。
MapReduceジョブ中に、HadoopはMapタスクとReduceタスクをクラスター内の適切なサーバーに送信します。
フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター周辺でのデータのコピーなど、データ受け渡しのすべての詳細を管理します。
ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。
指定されたタスクの完了後、クラスターはデータを収集および削減して適切な結果を形成し、それをHadoopサーバーに送り返します。
入力と出力(Javaパースペクティブ)
MapReduceフレームワークは<key、value>ペアで動作します。つまり、フレームワークはジョブへの入力を<key、value>ペアのセットとして表示し、<key、value>ペアのセットをジョブの出力として生成します。 、おそらく異なるタイプのもの。
キークラスと値クラスはフレームワークによってシリアル化されている必要があるため、書き込み可能なインターフェイスを実装する必要があります。さらに、キークラスは、フレームワークによる並べ替えを容易にするために、Writable-Comparableインターフェイスを実装する必要があります。の入力および出力タイプMapReduce job −(入力)<k1、v1>→map→<k2、v2>→reduce→<k3、v3>(出力)。
入力 | 出力 | |
---|---|---|
地図 | <k1、v1> | リスト(<k2、v2>) |
減らす | <k2、list(v2)> | リスト(<k3、v3>) |
用語
PayLoad −アプリケーションは、Map関数とReduce関数を実装し、ジョブの中核を形成します。
Mapper −マッパーは、入力キー/値ペアを中間キー/値ペアのセットにマップします。
NamedNode − Hadoop分散ファイルシステム(HDFS)を管理するノード。
DataNode −処理が行われる前にデータが事前に提示されるノード。
MasterNode − JobTrackerが実行され、クライアントからのジョブ要求を受け入れるノード。
SlaveNode − Map andReduceプログラムが実行されるノード。
JobTracker −ジョブをスケジュールし、タスクトラッカーへの割り当てジョブを追跡します。
Task Tracker −タスクを追跡し、ステータスをJobTrackerに報告します。
Job −プログラムは、データセット全体でのマッパーとリデューサーの実行です。
Task −データのスライスに対するマッパーまたはレデューサーの実行。
Task Attempt −SlaveNodeでタスクを実行しようとする特定のインスタンス。
シナリオ例
以下は、組織の電力消費に関するデータです。毎月の電力消費量とさまざまな年の年間平均が含まれています。
1月 | 2月 | 3月 | 4月 | 五月 | 6月 | 7月 | 8月 | 9月 | 10月 | 11月 | 12月 | 平均 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979年 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980年 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981年 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984年 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985年 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
上記のデータが入力として与えられた場合、それを処理し、最大使用年、最小使用年などの結果を生成するアプリケーションを作成する必要があります。これは、レコード数が有限のプログラマーにとってのウォークオーバーです。必要な出力を生成するロジックを記述し、書き込まれたアプリケーションにデータを渡すだけです。
しかし、特定の州のすべての大規模産業の設立以来の電力消費量を表すデータについて考えてみてください。
このようなバルクデータを処理するアプリケーションを作成する場合、
それらは実行するのに多くの時間がかかります。
ソースからネットワークサーバーなどにデータを移動すると、ネットワークトラフィックが大量に発生します。
これらの問題を解決するために、MapReduceフレームワークがあります。
入力データ
上記のデータは次のように保存されます sample.txtそして入力として与えられます。入力ファイルは次のようになります。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
サンプルプログラム
以下に、MapReduceフレームワークを使用したサンプルデータへのプログラムを示します。
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
上記のプログラムを次のように保存します ProcessUnits.java. プログラムのコンパイルと実行について以下に説明します。
プロセスユニットプログラムのコンパイルと実行
Hadoopユーザーのホームディレクトリ(例:/ home / hadoop)にいると仮定します。
上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。
ステップ1
次のコマンドは、コンパイルされたJavaクラスを格納するディレクトリを作成することです。
$ mkdir units
ステップ2
ダウンロード Hadoop-core-1.2.1.jar,これは、MapReduceプログラムをコンパイルして実行するために使用されます。次のリンクmvnrepository.comにアクセスして、jarをダウンロードしてください。ダウンロードしたフォルダが/home/hadoop/.
ステップ3
次のコマンドは、コンパイルに使用されます ProcessUnits.java プログラムとプログラムのjarファイルを作成します。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
ステップ4
次のコマンドを使用して、HDFSに入力ディレクトリを作成します。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
ステップ5
次のコマンドを使用して、という名前の入力ファイルをコピーします sample.txtHDFSの入力ディレクトリにあります。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
ステップ6
次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
ステップ7
次のコマンドは、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行するために使用されます。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
ファイルが実行されるまでしばらく待ちます。実行後、以下に示すように、出力には、入力分割の数、マップタスクの数、レデューサータスクの数などが含まれます。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
ステップ8
次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
ステップ9
次のコマンドを使用して、の出力を確認します。 Part-00000 ファイル。このファイルはHDFSによって生成されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下は、MapReduceプログラムによって生成された出力です。
1981 34
1984 40
1985 45
ステップ10
次のコマンドは、分析のために出力フォルダーをHDFSからローカルファイルシステムにコピーするために使用されます。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要なコマンド
すべてのHadoopコマンドは、 $HADOOP_HOME/bin/hadoopコマンド。引数なしでHadoopスクリプトを実行すると、すべてのコマンドの説明が出力されます。
Usage − hadoop [--configconfdir]コマンド
次の表に、使用可能なオプションとその説明を示します。
シニア番号 | オプションと説明 |
---|---|
1 | namenode -format DFSファイルシステムをフォーマットします。 |
2 | secondarynamenode DFSセカンダリネームノードを実行します。 |
3 | namenode DFSネームノードを実行します。 |
4 | datanode DFSデータノードを実行します。 |
5 | dfsadmin DFS管理クライアントを実行します。 |
6 | mradmin Map-Reduce管理クライアントを実行します。 |
7 | fsck DFSファイルシステムチェックユーティリティを実行します。 |
8 | fs 汎用ファイルシステムユーザークライアントを実行します。 |
9 | balancer クラスタバランシングユーティリティを実行します。 |
10 | oiv オフラインのfsimageビューアをfsimageに適用します。 |
11 | fetchdt NameNodeから委任トークンを取得します。 |
12 | jobtracker MapReduceジョブトラッカーノードを実行します。 |
13 | pipes パイプジョブを実行します。 |
14 | tasktracker MapReduceタスクトラッカーノードを実行します。 |
15 | historyserver ジョブ履歴サーバーをスタンドアロンデーモンとして実行します。 |
16 | job MapReduceジョブを操作します。 |
17 | queue JobQueuesに関する情報を取得します。 |
18 | version バージョンを印刷します。 |
19 | jar <jar> jarファイルを実行します。 |
20 | distcp <srcurl> <desturl> ファイルまたはディレクトリを再帰的にコピーします。 |
21 | distcp2 <srcurl> <desturl> DistCpバージョン2。 |
22 | archive -archiveName NAME -p <parent path> <src>* <dest> Hadoopアーカイブを作成します。 |
23 | classpath Hadoopjarと必要なライブラリを取得するために必要なクラスパスを出力します。 |
24 | daemonlog 各デーモンのログレベルを取得/設定します |
MapReduceジョブと対話する方法
使用法-hadoopジョブ[GENERIC_OPTIONS]
以下は、Hadoopジョブで使用できる汎用オプションです。
シニア番号 | GENERIC_OPTION&説明 |
---|---|
1 | -submit <job-file> ジョブを送信します。 |
2 | -status <job-id> マップを印刷し、完了率とすべてのジョブカウンターを減らします。 |
3 | -counter <job-id> <group-name> <countername> カウンタ値を出力します。 |
4 | -kill <job-id> ジョブを強制終了します。 |
5 | -events <job-id> <fromevent-#> <#-of-events> 指定された範囲でjobtrackerが受信したイベントの詳細を出力します。 |
6 | -history [all] <jobOutputDir> - history < jobOutputDir> ジョブの詳細、失敗および強制終了されたチップの詳細を出力します。[all]オプションを指定すると、成功したタスクや各タスクに対して行われたタスクの試行など、ジョブに関する詳細を表示できます。 |
7 | -list[all] すべてのジョブを表示します。-listには、まだ完了していないジョブのみが表示されます。 |
8 | -kill-task <task-id> タスクを強制終了します。強制終了されたタスクは、失敗した試行に対してカウントされません。 |
9 | -fail-task <task-id> タスクに失敗します。失敗したタスクは、失敗した試行に対してカウントされます。 |
10 | -set-priority <job-id> <priority> ジョブの優先度を変更します。許可される優先度の値は、VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOWです。 |
ジョブのステータスを確認するには
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
ジョブoutput-dirの履歴を表示するには
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
仕事を殺すために
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004