Hadoop-MapReduce

MapReduceは、コモディティハードウェアの大規模なクラスター上で、信頼性の高い方法で大量のデータを並行して処理するアプリケーションを作成するために使用できるフレームワークです。

MapReduceとは何ですか?

MapReduceは、Javaに基づく分散コンピューティングのための処理技術およびプログラムモデルです。MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。Mapはデータのセットを取得し、それを別のデータのセットに変換します。ここで、個々の要素はタプル(キーと値のペア)に分割されます。次に、タスクを減らします。これは、マップからの出力を入力として受け取り、それらのデータタプルをより小さなタプルのセットに結合します。MapReduceという名前のシーケンスが示すように、reduceタスクは常にマップジョブの後に実行されます。

MapReduceの主な利点は、複数のコンピューティングノード間でデータ処理を簡単にスケーリングできることです。MapReduceモデルでは、データ処理プリミティブはマッパーおよびリデューサーと呼ばれます。データ処理アプリケーションをマッパーリデューサーに分解することは、簡単ではない場合があります。ただし、MapReduceフォームでアプリケーションを作成すると、クラスター内で数百、数千、さらには数万のマシンを実行するようにアプリケーションをスケーリングするだけで、構成を変更するだけです。この単純なスケーラビリティは、多くのプログラマーがMapReduceモデルを使用するように惹きつけてきたものです。

アルゴリズム

  • 一般に、MapReduceパラダイムは、データが存在する場所にコンピューターを送信することに基づいています。

  • MapReduceプログラムは、マップステージ、シャッフルステージ、リデュースステージの3つのステージで実行されます。

    • Map stage−マップまたはマッパーの仕事は、入力データを処理することです。通常、入力データはファイルまたはディレクトリの形式であり、Hadoopファイルシステム(HDFS)に保存されます。入力ファイルは、マッパー関数に1行ずつ渡されます。マッパーはデータを処理し、データのいくつかの小さなチャンクを作成します。

    • Reduce stage −この段階は、 Shuffle ステージと Reduceステージ。レデューサーの仕事は、マッパーからのデータを処理することです。処理後、新しい出力セットが生成され、HDFSに保存されます。

  • MapReduceジョブ中に、HadoopはMapタスクとReduceタスクをクラスター内の適切なサーバーに送信します。

  • フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター周辺でのデータのコピーなど、データ受け渡しのすべての詳細を管理します。

  • ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。

  • 指定されたタスクの完了後、クラスターはデータを収集および削減して適切な結果を形成し、それをHadoopサーバーに送り返します。

入力と出力(Javaパースペクティブ)

MapReduceフレームワークは<key、value>ペアで動作します。つまり、フレームワークはジョブへの入力を<key、value>ペアのセットとして表示し、<key、value>ペアのセットをジョブの出力として生成します。 、おそらく異なるタイプのもの。

キークラスと値クラスはフレームワークによってシリアル化されている必要があるため、書き込み可能なインターフェイスを実装する必要があります。さらに、キークラスは、フレームワークによる並べ替えを容易にするために、Writable-Comparableインターフェイスを実装する必要があります。の入力および出力タイプMapReduce job −(入力)<k1、v1>→map→<k2、v2>→reduce→<k3、v3>(出力)。

入力 出力
地図 <k1​​、v1> リスト(<k2、v2>)
減らす <k2、list(v2)> リスト(<k3、v3>)

用語

  • PayLoad −アプリケーションは、Map関数とReduce関数を実装し、ジョブの中核を形成します。

  • Mapper −マッパーは、入力キー/値ペアを中間キー/値ペアのセットにマップします。

  • NamedNode − Hadoop分散ファイルシステム(HDFS)を管理するノード。

  • DataNode −処理が行われる前にデータが事前に提示されるノード。

  • MasterNode − JobTrackerが実行され、クライアントからのジョブ要求を受け入れるノード。

  • SlaveNode − Map andReduceプログラムが実行されるノード。

  • JobTracker −ジョブをスケジュールし、タスクトラッカーへの割り当てジョブを追跡します。

  • Task Tracker −タスクを追跡し、ステータスをJobTrackerに報告します。

  • Job −プログラムは、データセット全体でのマッパーとリデューサーの実行です。

  • Task −データのスライスに対するマッパーまたはレデューサーの実行。

  • Task Attempt −SlaveNodeでタスクを実行しようとする特定のインスタンス。

シナリオ例

以下は、組織の電力消費に関するデータです。毎月の電力消費量とさまざまな年の年間平均が含まれています。

1月 2月 3月 4月 五月 6月 7月 8月 9月 10月 11月 12月 平均
1979年 23 23 2 43 24 25 26 26 26 26 25 26 25
1980年 26 27 28 28 28 30 31 31 31 30 30 30 29
1981年 31 32 32 32 33 34 35 36 36 34 34 34 34
1984年 39 38 39 39 39 41 42 43 40 39 38 38 40
1985年 38 39 39 39 39 41 41 41 00 40 39 39 45

上記のデータが入力として与えられた場合、それを処理し、最大使用年、最小使用年などの結果を生成するアプリケーションを作成する必要があります。これは、レコード数が有限のプログラマーにとってのウォークオーバーです。必要な出力を生成するロジックを記述し、書き込まれたアプリケーションにデータを渡すだけです。

しかし、特定の州のすべての大規模産業の設立以来の電力消費量を表すデータについて考えてみてください。

このようなバルクデータを処理するアプリケーションを作成する場合、

  • それらは実行するのに多くの時間がかかります。

  • ソースからネットワークサーバーなどにデータを移動すると、ネットワークトラフィックが大量に発生します。

これらの問題を解決するために、MapReduceフレームワークがあります。

入力データ

上記のデータは次のように保存されます sample.txtそして入力として与えられます。入力ファイルは次のようになります。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

サンプルプログラム

以下に、MapReduceフレームワークを使用したサンプルデータへのプログラムを示します。

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

上記のプログラムを次のように保存します ProcessUnits.java. プログラムのコンパイルと実行について以下に説明します。

プロセスユニットプログラムのコンパイルと実行

Hadoopユーザーのホームディレクトリ(例:/ home / hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

ステップ1

次のコマンドは、コンパイルされたJavaクラスを格納するディレクトリを作成することです。

$ mkdir units

ステップ2

ダウンロード Hadoop-core-1.2.1.jar,これは、MapReduceプログラムをコンパイルして実行するために使用されます。次のリンクmvnrepository.comにアクセスして、jarをダウンロードしてください。ダウンロードしたフォルダが/home/hadoop/.

ステップ3

次のコマンドは、コンパイルに使用されます ProcessUnits.java プログラムとプログラムのjarファイルを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

ステップ4

次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

ステップ5

次のコマンドを使用して、という名前の入力ファイルをコピーします sample.txtHDFSの入力ディレクトリにあります。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

ステップ6

次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

ステップ7

次のコマンドは、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行するために使用されます。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。実行後、以下に示すように、出力には、入力分割の数、マップタスクの数、レデューサータスクの数などが含まれます。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

ステップ8

次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

ステップ9

次のコマンドを使用して、の出力を確認します。 Part-00000 ファイル。このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です。

1981    34 
1984    40 
1985    45

ステップ10

次のコマンドは、分析のために出力フォルダーをHDFSからローカルファイルシステムにコピーするために使用されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

重要なコマンド

すべてのHadoopコマンドは、 $HADOOP_HOME/bin/hadoopコマンド。引数なしでHadoopスクリプトを実行すると、すべてのコマンドの説明が出力されます。

Usage − hadoop [--configconfdir]コマンド

次の表に、使用可能なオプションとその説明を示します。

シニア番号 オプションと説明
1

namenode -format

DFSファイルシステムをフォーマットします。

2

secondarynamenode

DFSセカンダリネームノードを実行します。

3

namenode

DFSネームノードを実行します。

4

datanode

DFSデータノードを実行します。

5

dfsadmin

DFS管理クライアントを実行します。

6

mradmin

Map-Reduce管理クライアントを実行します。

7

fsck

DFSファイルシステムチェックユーティリティを実行します。

8

fs

汎用ファイルシステムユーザークライアントを実行します。

9

balancer

クラスタバランシングユーティリティを実行します。

10

oiv

オフラインのfsimageビューアをfsimageに適用します。

11

fetchdt

NameNodeから委任トークンを取得します。

12

jobtracker

MapReduceジョブトラッカーノードを実行します。

13

pipes

パイプジョブを実行します。

14

tasktracker

MapReduceタスクトラッカーノードを実行します。

15

historyserver

ジョブ履歴サーバーをスタンドアロンデーモンとして実行します。

16

job

MapReduceジョブを操作します。

17

queue

JobQueuesに関する情報を取得します。

18

version

バージョンを印刷します。

19

jar <jar>

jarファイルを実行します。

20

distcp <srcurl> <desturl>

ファイルまたはディレクトリを再帰的にコピーします。

21

distcp2 <srcurl> <desturl>

DistCpバージョン2。

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Hadoopアーカイブを作成します。

23

classpath

Hadoopjarと必要なライブラリを取得するために必要なクラスパスを出力します。

24

daemonlog

各デーモンのログレベルを取得/設定します

MapReduceジョブと対話する方法

使用法-hadoopジョブ[GENERIC_OPTIONS]

以下は、Hadoopジョブで使用できる汎用オプションです。

シニア番号 GENERIC_OPTION&説明
1

-submit <job-file>

ジョブを送信します。

2

-status <job-id>

マップを印刷し、完了率とすべてのジョブカウンターを減らします。

3

-counter <job-id> <group-name> <countername>

カウンタ値を出力します。

4

-kill <job-id>

ジョブを強制終了します。

5

-events <job-id> <fromevent-#> <#-of-events>

指定された範囲でjobtrackerが受信したイベントの詳細を出力します。

6

-history [all] <jobOutputDir> - history < jobOutputDir>

ジョブの詳細、失敗および強制終了されたチップの詳細を出力します。[all]オプションを指定すると、成功したタスクや各タスクに対して行われたタスクの試行など、ジョブに関する詳細を表示できます。

7

-list[all]

すべてのジョブを表示します。-listには、まだ完了していないジョブのみが表示されます。

8

-kill-task <task-id>

タスクを強制終了します。強制終了されたタスクは、失敗した試行に対してカウントされません。

9

-fail-task <task-id>

タスクに失敗します。失敗したタスクは、失敗した試行に対してカウントされます。

10

-set-priority <job-id> <priority>

ジョブの優先度を変更します。許可される優先度の値は、VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOWです。

ジョブのステータスを確認するには

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

ジョブoutput-dirの履歴を表示するには

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

仕事を殺すために

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004