MapReduce-クイックガイド

MapReduceは、複数のノードでビッグデータを並行して処理できるアプリケーションを作成するためのプログラミングモデルです。MapReduceは、膨大な量の複雑なデータを分析するための分析機能を提供します。

ビッグデータとは?

ビッグデータは、従来のコンピューティング技術では処理できない大きなデータセットのコレクションです。たとえば、FacebookやYoutubeが日常的に収集および管理するために必要なデータの量は、ビッグデータのカテゴリに分類できます。ただし、ビッグデータは規模と量だけでなく、速度、多様性、量、複雑さの1つ以上の側面も含みます。

なぜMapReduceなのか?

従来のエンタープライズシステムには通常、データを保存および処理するための集中型サーバーがあります。次の図は、従来のエンタープライズシステムの概略図を示しています。従来のモデルは、大量のスケーラブルなデータを処理するのには確かに適していないため、標準のデータベースサーバーでは対応できません。さらに、集中型システムでは、複数のファイルを同時に処理する際にボトルネックが発生しすぎます。

Googleは、MapReduceと呼ばれるアルゴリズムを使用してこのボトルネックの問題を解決しました。MapReduceは、タスクを小さな部分に分割し、それらを多くのコンピューターに割り当てます。その後、結果は1つの場所で収集され、統合されて結果データセットが形成されます。

MapReduceはどのように機能しますか?

MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。

  • マップタスクは、データセットを取得し、それを別のデータセットに変換します。ここで、個々の要素はタプル(キーと値のペア)に分割されます。

  • 削減タスクは、マップからの出力を入力として受け取り、それらのデータタプル(キーと値のペア)をより小さなタプルのセットに結合します。

削減タスクは、常にマップジョブの後に実行されます。

ここで、各フェーズを詳しく見て、それらの重要性を理解してみましょう。

  • Input Phase −ここには、入力ファイル内の各レコードを変換し、解析されたデータをキーと値のペアの形式でマッパーに送信するレコードリーダーがあります。

  • Map −マップはユーザー定義関数であり、一連のキーと値のペアを取得し、それぞれを処理して0個以上のキーと値のペアを生成します。

  • Intermediate Keys −マッパーによって生成されたキーと値のペアは、中間キーと呼ばれます。

  • Combiner−コンバイナーは、マップフェーズからの同様のデータを識別可能なセットにグループ化する一種のローカルレデューサーです。マッパーから中間キーを入力として受け取り、ユーザー定義コードを適用して、1つのマッパーの小さなスコープ内の値を集約します。これはメインのMapReduceアルゴリズムの一部ではありません。オプションです。

  • Shuffle and Sort−レデューサータスクは、シャッフルと並べ替えのステップから始まります。グループ化されたキーと値のペアを、Reducerが実行されているローカルマシンにダウンロードします。個々のキーと値のペアは、キーごとに大きなデータリストに並べ替えられます。データリストは、同等のキーをグループ化して、Reducerタスクでそれらの値を簡単に繰り返すことができるようにします。

  • Reducer−レデューサーは、グループ化されたキーと値のペアのデータを入力として受け取り、それぞれに対してレデューサー関数を実行します。ここでは、データをさまざまな方法で集約、フィルタリング、および組み合わせることができ、さまざまな処理が必要になります。実行が終了すると、最後のステップに0個以上のキーと値のペアが与えられます。

  • Output Phase −出力フェーズには、Reducer関数からの最終的なキーと値のペアを変換し、レコードライターを使用してファイルに書き込む出力フォーマッターがあります。

小さな図を使用して、Map&fReduceの2つのタスクを理解してみましょう-

MapReduce-例

MapReduceの力を理解するために、実際の例を見てみましょう。Twitterは1日あたり約5億のツイートを受信します。これは、1秒あたり約3000のツイートです。次の図は、ツイーターがMapReduceを使用してツイートを管理する方法を示しています。

図に示すように、MapReduceアルゴリズムは次のアクションを実行します-

  • Tokenize −ツイートをトークン化してトークンのマップにし、キーと値のペアとして書き込みます。

  • Filter −トークンのマップから不要な単語をフィルタリングし、フィルタリングされたマップをキーと値のペアとして書き込みます。

  • Count −単語ごとにトークンカウンターを生成します。

  • Aggregate Counters −同様のカウンター値の集計を管理可能な小さな単位に準備します。

MapReduceアルゴリズムには、MapとReduceという2つの重要なタスクが含まれています。

  • マップタスクは、マッパークラスを使用して実行されます
  • リデュースタスクは、リデューサークラスを使用して実行されます。

Mapperクラスは、入力を受け取り、トークン化し、マップして並べ替えます。Mapperクラスの出力は、Reducerクラスによる入力として使用されます。これにより、一致するペアが検索され、それらが削減されます。

MapReduceは、タスクを小さな部分に分割し、それらを複数のシステムに割り当てるためのさまざまな数学的アルゴリズムを実装しています。技術的には、MapReduceアルゴリズムは、Map&Reduceタスクをクラスター内の適切なサーバーに送信するのに役立ちます。

これらの数学的アルゴリズムには、次のものが含まれる場合があります。

  • Sorting
  • Searching
  • Indexing
  • TF-IDF

並べ替え

並べ替えは、データを処理および分析するための基本的なMapReduceアルゴリズムの1つです。MapReduceは、ソートアルゴリズムを実装して、マッパーからの出力キーと値のペアをキーで自動的にソートします。

  • ソートメソッドは、マッパークラス自体に実装されています。

  • シャッフルと並べ替えフェーズでは、マッパークラスの値をトークン化した後、 Context class(ユーザー定義クラス)は、一致する値のキーをコレクションとして収集します。

  • 同様のキーと値のペア(中間キー)を収集するために、Mapperクラスは RawComparator キーと値のペアをソートするクラス。

  • 特定のReducerの中間キーと値のペアのセットは、Hadoopによって自動的に並べ替えられ、Reducerに提示される前にキー値(K2、{V2、V2、…})を形成します。

検索中

検索は、MapReduceアルゴリズムで重要な役割を果たします。コンバイナーフェーズ(オプション)とレデューサーフェーズで役立ちます。例を使用して、検索がどのように機能するかを理解してみましょう。

次の例は、MapReduceが検索アルゴリズムを使用して、特定の従業員データセットで最高の給与を引き出している従業員の詳細を見つける方法を示しています。

  • A、B、C、Dの4つの異なるファイルに従業員データがあると仮定します。また、すべてのデータベーステーブルから従業員データを繰り返しインポートするため、4つのファイルすべてに重複する従業員レコードがあると仮定します。次の図を参照してください。

  • The Map phase各入力ファイルを処理し、キーと値のペア(<k、v>:<emp name、salary>)で従業員データを提供します。次の図を参照してください。

  • The combiner phase(検索手法)は、マップフェーズからの入力を、従業員名と給与のキーと値のペアとして受け入れます。検索手法を使用して、コンバイナーはすべての従業員の給与をチェックし、各ファイルで最も給与の高い従業員を見つけます。次のスニペットを参照してください。

<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary

if(v(second employee).salary > Max){
   Max = v(salary);
}

else{
   Continue checking;
}

期待される結果は次のとおりです-

<satish、26000>

<gopal、50000>

<キラン、45000>

<マニシャ、45000>

  • Reducer phase−各ファイルを作成すると、最も給与の高い従業員が見つかります。冗長性を回避するには、すべての<k、v>ペアを確認し、重複するエントリがある場合は削除します。同じアルゴリズムが、4つの入力ファイルからの4つの<k、v>ペアの間で使用されます。最終的な出力は次のようになります-

<gopal, 50000>

インデックス作成

通常、インデックスは特定のデータとそのアドレスを指すために使用されます。特定のマッパーの入力ファイルに対してバッチインデックスを実行します。

MapReduceで通常使用されるインデックス作成手法は、 inverted index.GoogleやBingなどの検索エンジンは、転置インデックス技術を使用しています。簡単な例を使用して、インデックス作成がどのように機能するかを理解してみましょう。

次のテキストは、転置インデックスの入力です。ここで、T [0]、T [1]、およびt [2]はファイル名であり、それらの内容は二重引用符で囲まれています。

T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"

インデックス作成アルゴリズムを適用すると、次の出力が得られます。

"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}

ここで「a」:{2}は、「a」という用語がT [2]ファイルに表示されることを意味します。同様に、「is」:{0、1、2}は、「is」という用語がファイルT [0]、T [1]、およびT [2]に現れることを意味します。

TF-IDF

TF-IDFは、Term Frequency-Inverse DocumentFrequencyの略であるテキスト処理アルゴリズムです。これは、一般的なWeb分析アルゴリズムの1つです。ここで、「頻度」という用語は、用語がドキュメントに出現する回数を指します。

期間頻度(TF)

これは、特定の用語がドキュメントで発生する頻度を測定します。これは、ドキュメントに単語が表示される回数を、そのドキュメントの単語の総数で割って計算されます。

TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)

逆ドキュメント頻度(IDF)

用語の重要性を測定します。これは、テキストデータベース内のドキュメントの数を、特定の用語が表示されているドキュメントの数で割って計算されます。

TFを計算する際、すべての用語は等しく重要であると見なされます。つまり、TFは、「is」、「a」、「what」などの通常の単語の用語頻度をカウントします。したがって、次の計算を行うことにより、まれな用語をスケールアップしながら、頻繁な用語を知る必要があります。

IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).

アルゴリズムは、小さな例の助けを借りて以下に説明されています。

1000語を含むドキュメントを考えてみましょう。 hive50回出現します。のTFhive その場合、(50/1000)= 0.05です。

さて、1000万のドキュメントと単語があると仮定します hiveこれらの1000に表示されます。次に、IDFはlog(10,000,000 / 1,000)= 4として計算されます。

TF-IDFの重みは、これらの量の積-0.05×4 = 0.20です。

MapReduceは、Linuxフレーバーのオペレーティングシステムでのみ機能し、Hadoopフレームワークが組み込まれています。Hadoopフレームワークをインストールするには、次の手順を実行する必要があります。

JAVAインストールの確認

Hadoopをインストールする前に、Javaをシステムにインストールする必要があります。次のコマンドを使用して、システムにJavaがインストールされているかどうかを確認します。

$ java –version

Javaがすでにシステムにインストールされている場合は、次の応答が表示されます-

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

システムにJavaがインストールされていない場合は、以下の手順に従ってください。

Javaのインストール

ステップ1

次のリンクからJavaの最新バージョンをダウンロードします-このリンク。

ダウンロード後、ファイルを見つけることができます jdk-7u71-linux-x64.tar.gz ダウンロードフォルダにあります。

ステップ2

次のコマンドを使用して、jdk-7u71-linux-x64.gzの内容を抽出します。

$ cd Downloads/
$ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

ステップ3

すべてのユーザーがJavaを使用できるようにするには、Javaを「/ usr / local /」の場所に移動する必要があります。ルートに移動し、次のコマンドを入力します-

$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit

ステップ4

PATH変数とJAVA_HOME変数を設定するには、次のコマンドを〜/ .bashrcファイルに追加します。

export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin

現在実行中のシステムにすべての変更を適用します。

$ source ~/.bashrc

ステップ5

次のコマンドを使用して、Javaの代替を構成します-

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2

# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2

# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java

# alternatives --set javac usr/local/java/bin/javac

# alternatives --set jar usr/local/java/bin/jar

コマンドを使用してインストールを確認します java -version ターミナルから。

Hadoopのインストールの確認

MapReduceをインストールする前に、Hadoopをシステムにインストールする必要があります。次のコマンドを使用して、Hadoopのインストールを確認しましょう-

$ hadoop version

Hadoopがすでにシステムにインストールされている場合は、次の応答が返されます-

Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

システムにHadoopがインストールされていない場合は、次の手順に進みます。

Hadoopのダウンロード

Apache SoftwareFoundationからHadoop2.4.1をダウンロードし、次のコマンドを使用してその内容を抽出します。

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

疑似分散モードでのHadoopのインストール

次の手順は、Hadoop2.4.1を疑似分散モードでインストールするために使用されます。

ステップ1-Hadoopのセットアップ

以下のコマンドを〜/ .bashrcファイルに追加することで、Hadoop環境変数を設定できます。

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

現在実行中のシステムにすべての変更を適用します。

$ source ~/.bashrc

ステップ2-Hadoop構成

すべてのHadoop構成ファイルは、「$ HADOOP_HOME / etc / hadoop」の場所にあります。Hadoopインフラストラクチャに応じて、これらの構成ファイルに適切な変更を加える必要があります。

$ cd $HADOOP_HOME/etc/hadoop

Javaを使用してHadoopプログラムを開発するには、でJava環境変数をリセットする必要があります。 hadoop-env.sh JAVA_HOME値をシステム内のJavaの場所に置き換えてファイルを作成します。

export JAVA_HOME=/usr/local/java

Hadoopを構成するには、次のファイルを編集する必要があります-

  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml
  • mapred-site.xml

core-site.xml

core-site.xmlには次の情報が含まれています-

  • Hadoopインスタンスに使用されるポート番号
  • ファイルシステムに割り当てられたメモリ
  • データを保存するためのメモリ制限
  • 読み取り/書き込みバッファのサイズ

core-site.xmlを開き、<configuration>タグと</ configuration>タグの間に次のプロパティを追加します。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000 </value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xmlには次の情報が含まれています-

  • レプリケーションデータの価値
  • ネームノードパス
  • ローカルファイルシステムのデータノードパス(Hadoopインフラストラクチャを保存する場所)

以下のデータを想定します。

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

このファイルを開き、<configuration>、</ configuration>タグの間に次のプロパティを追加します。

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>
   
   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>
   
</configuration>

Note −上記のファイルでは、すべてのプロパティ値がユーザー定義であり、Hadoopインフラストラクチャに応じて変更を加えることができます。

糸-site.xml

このファイルは、Hadoopにyarnを構成するために使用されます。ヤーンサイト.xmlファイルを開き、<configuration>、</ configuration>タグの間に次のプロパティを追加します。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

このファイルは、使用しているMapReduceフレームワークを指定するために使用されます。デフォルトでは、Hadoopにはyarn-site.xmlのテンプレートが含まれています。まず、次のコマンドを使用して、ファイルをmapred-site.xml.templateからmapred-site.xmlファイルにコピーする必要があります。

$ cp mapred-site.xml.template mapred-site.xml

mapred-site.xmlファイルを開き、<configuration>、</ configuration>タグの間に次のプロパティを追加します。

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Hadoopのインストールの確認

次の手順は、Hadoopのインストールを確認するために使用されます。

ステップ1-名前ノードの設定

次のようにコマンド「hdfsnamenode-format」を使用してnamenodeを設定します-

$ cd ~ $ hdfs namenode -format

期待される結果は次のとおりです-

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

ステップ2-Hadoopdfsの確認

次のコマンドを実行して、Hadoopファイルシステムを起動します。

$ start-dfs.sh

期待される出力は次のとおりです-

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

ステップ3-糸スクリプトの検証

次のコマンドを使用して、yarnスクリプトを開始します。このコマンドを実行すると、yarnデーモンが起動します。

$ start-yarn.sh

期待される出力は次のとおりです-

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

ステップ4-ブラウザでHadoopにアクセスする

Hadoopにアクセスするためのデフォルトのポート番号は50070です。ブラウザーでHadoopサービスを取得するには、次のURLを使用します。

http://localhost:50070/

次のスクリーンショットは、Hadoopブラウザーを示しています。

ステップ5-クラスターのすべてのアプリケーションを確認する

クラスタのすべてのアプリケーションにアクセスするためのデフォルトのポート番号は8088です。このサービスを使用するには、次のURLを使用してください。

http://localhost:8088/

次のスクリーンショットは、Hadoopクラスターブラウザーを示しています。

この章では、MapReduceプログラミングの操作に関係するクラスとそのメソッドを詳しく見ていきます。私たちは主に次のことに焦点を当てます-

  • JobContextインターフェース
  • ジョブクラス
  • マッパークラス
  • レデューサークラス

JobContextインターフェース

JobContextインターフェースは、すべてのクラスのスーパーインターフェースであり、MapReduceでさまざまなジョブを定義します。タスクの実行中にタスクに提供されるジョブの読み取り専用ビューを提供します。

以下は、JobContextインターフェースのサブインターフェースです。

S.No. サブインターフェイスの説明
1.1。 MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

マッパーに与えられるコンテキストを定義します。

2.2。 ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

レデューサーに渡されるコンテキストを定義します。

Jobクラスは、JobContextインターフェイスを実装するメインクラスです。

ジョブクラス

Jobクラスは、MapReduceAPIで最も重要なクラスです。これにより、ユーザーはジョブの構成、送信、実行の制御、および状態の照会を行うことができます。setメソッドは、ジョブが送信されるまでのみ機能し、その後はIllegalStateExceptionをスローします。

通常、ユーザーはアプリケーションを作成し、ジョブのさまざまな側面を説明してから、ジョブを送信してその進行状況を監視します。

これが仕事を提出する方法の例です-

// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

コンストラクター

以下は、Jobクラスのコンストラクターの要約です。

S.No コンストラクターの要約
1 Job()
2 Job(構成設定)
3 Job(構成設定、文字列jobName)

メソッド

Jobクラスの重要なメソッドのいくつかは次のとおりです-

S.No メソッドの説明
1 getJobName()

ユーザー指定のジョブ名。

2 getJobState()

ジョブの現在の状態を返します。

3 isComplete()

ジョブが終了したかどうかを確認します。

4 setInputFormatClass()

ジョブのInputFormatを設定します。

5 setJobName(String name)

ユーザー指定のジョブ名を設定します。

6 setOutputFormatClass()

ジョブの出力形式を設定します。

7 setMapperClass(Class)

ジョブのマッパーを設定します。

8 setReducerClass(Class)

ジョブのレデューサーを設定します。

9 setPartitionerClass(Class)

ジョブのパーティショナーを設定します。

10 setCombinerClass(Class)

ジョブのコンバイナを設定します。

マッパークラス

Mapperクラスは、Mapジョブを定義します。入力Key-Valueペアを一連の中間Key-Valueペアにマップします。マップは、入力レコードを中間レコードに変換する個々のタスクです。変換された中間レコードは、入力レコードと同じタイプである必要はありません。特定の入力ペアは、ゼロまたは多数の出力ペアにマップできます。

方法

mapMapperクラスの最も顕著なメソッドです。構文は以下に定義されています-

map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

このメソッドは、入力分割のキーと値のペアごとに1回呼び出されます。

レデューサークラス

Reducerクラスは、MapReduceでReduceジョブを定義します。キーを共有する中間値のセットを、より小さな値のセットに減らします。レデューサーの実装は、JobContext.getConfiguration()メソッドを介してジョブの構成にアクセスできます。レデューサーには、シャッフル、ソート、リデュースの3つの主要なフェーズがあります。

  • Shuffle −レデューサーは、ネットワーク全体でHTTPを使用して、各マッパーからソートされた出力をコピーします。

  • Sort−フレームワークマージ-レデューサー入力をキーでソートします(異なるマッパーが同じキーを出力した可能性があるため)。シャッフルフェーズとソートフェーズは同時に発生します。つまり、出力がフェッチされている間、それらはマージされます。

  • Reduce −このフェーズでは、reduce(Object、Iterable、Context)メソッドが、ソートされた入力の<key、(値のコレクション)>ごとに呼び出されます。

方法

reduceReducerクラスの最も顕著なメソッドです。構文は以下に定義されています-

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

このメソッドは、キーと値のペアのコレクションのキーごとに1回呼び出されます。

MapReduceは、コモディティハードウェアの大規模なクラスター上の大量のデータを信頼できる方法で処理するアプリケーションを作成するために使用されるフレームワークです。この章では、Javaを使用したHadoopフレームワークでのMapReduceの操作について説明します。

MapReduceアルゴリズム

一般に、MapReduceパラダイムは、実際のデータが存在するコンピューターにmap-reduceプログラムを送信することに基づいています。

  • MapReduceジョブ中に、HadoopはMapタスクとReduceタスクをクラスター内の適切なサーバーに送信します。

  • フレームワークは、タスクの発行、タスクの完了の確認、ノード間のクラスター周辺でのデータのコピーなど、データ受け渡しのすべての詳細を管理します。

  • ほとんどのコンピューティングは、ネットワークトラフィックを削減するローカルディスク上のデータを持つノードで実行されます。

  • 特定のタスクを完了した後、クラスターはデータを収集および削減して適切な結果を形成し、それをHadoopサーバーに送り返します。

入力と出力(Javaパースペクティブ)

MapReduceフレームワークは、キーと値のペアで動作します。つまり、フレームワークは、ジョブへの入力をキーと値のペアのセットとして表示し、キーと値のペアのセットを、おそらく異なるタイプのジョブの出力として生成します。

キークラスと値クラスはフレームワークによってシリアル化可能である必要があるため、書き込み可能なインターフェイスを実装する必要があります。さらに、主要なクラスは、フレームワークによる並べ替えを容易にするためにWritableComparableインターフェイスを実装する必要があります。

MapReduceジョブの入力形式と出力形式はどちらも、キーと値のペアの形式です。

(入力)<k1、v1>-> map-> <k2、v2>-> reduce-> <k3、v3>(出力)。

入力 出力
地図 <k1​​、v1> リスト(<k2、v2>)
減らす <k2、list(v2)> リスト(<k3、v3>)

MapReduceの実装

次の表は、組織の電力消費に関するデータを示しています。この表には、5年連続の月間電力消費量と年平均が含まれています。

1月 2月 3月 4月 五月 6月 7月 8月 9月 10月 11月 12月 平均
1979年 23 23 2 43 24 25 26 26 26 26 25 26 25
1980年 26 27 28 28 28 30 31 31 31 30 30 30 29
1981年 31 32 32 32 33 34 35 36 36 34 34 34 34
1984年 39 38 39 39 39 41 42 43 40 39 38 38 40
1985年 38 39 39 39 39 41 41 41 00 40 39 39 45

指定されたテーブルの入力データを処理して、最大使用年、最小使用年などを見つけるアプリケーションを作成する必要があります。このタスクは、必要な出力を生成するロジックを記述し、記述されたアプリケーションにデータを渡すだけなので、レコード数が限られているプログラマーにとっては簡単です。

ここで、入力データのスケールを上げてみましょう。特定の州のすべての大規模産業の電力消費量を分析する必要があると仮定します。このようなバルクデータを処理するアプリケーションを作成する場合、

  • 実行にはかなりの時間がかかります。

  • データをソースからネットワークサーバーに移動すると、ネットワークトラフィックが大量に発生します。

これらの問題を解決するために、MapReduceフレームワークがあります。

入力データ

上記のデータは次のように保存されます sample.txtそして入力として与えられます。入力ファイルは次のようになります。

1979年 23 23 2 43 24 25 26 26 26 26 25 26 25
1980年 26 27 28 28 28 30 31 31 31 30 30 30 29
1981年 31 32 32 32 33 34 35 36 36 34 34 34 34
1984年 39 38 39 39 39 41 42 43 40 39 38 38 40
1985年 38 39 39 39 39 41 41 41 00 40 39 39 45

サンプルプログラム

サンプルデータの次のプログラムは、MapReduceフレームワークを使用しています。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

上記のプログラムをに保存します ProcessUnits.java。プログラムのコンパイルと実行を以下に示します。

ProcessUnitsプログラムのコンパイルと実行

Hadoopユーザーのホームディレクトリ(例:/ home / hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

Step 1 −次のコマンドを使用して、コンパイルされたJavaクラスを格納するディレクトリを作成します。

$ mkdir units

Step 2−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。mvnrepository.comからjarファイルをダウンロードします。ダウンロードフォルダが/ home / hadoop /であると仮定します。

Step 3 −以下のコマンドを使用して、 ProcessUnits.java プログラムとプログラムのjarファイルを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 −次のコマンドを使用して、という名前の入力ファイルをコピーします。 sample.txt HDFSの入力ディレクトリにあります。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してEleunit_maxアプリケーションを実行します。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、レデューサータスクなどが含まれます。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 −次のコマンドを使用して、出力フォルダー内の結果ファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です-

1981年 34
1984年 40
1985年 45

Step 10 −次のコマンドを使用して、出力フォルダーをHDFSからローカルファイルシステムにコピーします。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop

パーティショナーは、入力データセットを処理する際の条件のように機能します。パーティションフェーズは、マップフェーズの後、リデュースフェーズの前に実行されます。

パーティショナーの数は、レデューサーの数と同じです。つまり、パーティショナーはレデューサーの数に応じてデータを分割します。したがって、単一のパーティショナーから渡されたデータは、単一のレデューサーによって処理されます。

パーティショナー

パーティショナーは、中間Map出力のキーと値のペアをパーティション化します。ハッシュ関数のように機能するユーザー定義の条件を使用してデータを分割します。パーティションの総数は、ジョブのレデューサータスクの数と同じです。パーティショナーがどのように機能するかを理解するために例を見てみましょう。

MapReduceパーティショナーの実装

便宜上、次のデータを持つEmployeeという小さなテーブルがあると仮定します。このサンプルデータを入力データセットとして使用して、パーティショナーがどのように機能するかを示します。

Id 名前 年齢 性別 給料
1201 ゴパル 45 男性 50,000
1202 マニシャ 40 女性 50,000
1203 カリル 34 男性 30,000
1204 プラシャーント 30 男性 30,000
1205 キラン 20 男性 40,000
1206 ラクシュミ 25 女性 35,000
1207 bhavya 20 女性 15,000
1208 レシュマ 19 女性 15,000
1209 クランティ 22 男性 22,000
1210 サティッシュ 24 男性 25,000
1211 クリシュナ 25 男性 25,000
1212 アーシャッド 28 男性 20,000
1213 ラヴァニャ 18 女性 8,000

入力データセットを処理して、さまざまな年齢層(たとえば、20歳未満、21歳から30歳、30歳以上)の性別で最も給与の高い従業員を見つけるアプリケーションを作成する必要があります。

入力データ

上記のデータは次のように保存されます input.txt 「/ home / hadoop / hadoopPartitioner」ディレクトリにあり、入力として指定されます。

1201 ゴパル 45 男性 50000
1202 マニシャ 40 女性 51000
1203 khaleel 34 男性 30000
1204 プラシャーント 30 男性 31000
1205 キラン 20 男性 40000
1206 ラクシュミ 25 女性 35000
1207 bhavya 20 女性 15000
1208 レシュマ 19 女性 14000
1209 クランティ 22 男性 22000
1210 サティッシュ 24 男性 25000
1211 クリシュナ 25 男性 26000
1212 アーシャッド 28 男性 20000
1213 ラヴァニャ 18 女性 8000

与えられた入力に基づいて、以下はプログラムのアルゴリズムの説明です。

マップタスク

マップタスクは、テキストファイルにテキストデータがある間、入力としてキーと値のペアを受け入れます。このマップタスクの入力は次のとおりです-

Input −キーは「任意の特殊キー+ファイル名+行番号」(例:key = @ input1)などのパターンであり、値はその行のデータ(例:value = 1201 \ t gopal \ t 45 \ t男性\ t 50000)。

Method −このマップタスクの操作は次のとおりです。

  • 読む value (レコードデータ)。これは、文字列の引数リストからの入力値として提供されます。

  • split関数を使用して、性別を分離し、文字列変数に格納します。

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 性別情報と記録データを送信します value マップタスクからの出力キーと値のペアとして partition task

context.write(new Text(gender), new Text(value));
  • テキストファイル内のすべてのレコードについて、上記のすべての手順を繰り返します。

Output −性別データとレコードデータ値をキーと値のペアとして取得します。

パーティショナータスク

パーティショナータスクは、マップタスクからのキーと値のペアを入力として受け入れます。パーティションとは、データをセグメントに分割することを意味します。パーティションの指定された条件付き基準に従って、入力されたキーと値のペアのデータは、年齢基準に基づいて3つの部分に分割できます。

Input −キーと値のペアのコレクション内のデータ全体。

key =レコード内の性別フィールド値。

value =その性別の全レコードデータ値。

Method −パーティションロジックのプロセスは次のように実行されます。

  • 入力キーと値のペアから年齢フィールドの値を読み取ります。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 以下の条件で年齢値を確認してください。

    • 20歳以下の年齢
    • 20歳以上30歳以下の年齢。
    • 30歳以上。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output−キーと値のペアのデータ全体は、キーと値のペアの3つのコレクションに分割されます。レデューサーは、各コレクションで個別に機能します。

タスクを減らす

パーティショナータスクの数は、レデューサータスクの数と同じです。ここでは、3つのパーティショナータスクがあるため、実行する3つのレデューサータスクがあります。

Input −レデューサーは、キーと値のペアの異なるコレクションで3回実行されます。

key =レコード内の性別フィールド値。

値=その性別のレコードデータ全体。

Method −次のロジックが各コレクションに適用されます。

  • 各レコードのSalaryフィールド値を読み取ります。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • max変数で給与を確認してください。str [4]が最大給与の場合は、str [4]をmaxに割り当てます。それ以外の場合は、手順をスキップします。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • キーコレクションごとに手順1と2を繰り返します(男性と女性がキーコレクションです)。これらの3つの手順を実行すると、男性のキーコレクションから最大給与が1つ、女性のキーコレクションから最大給与が1つ見つかります。

context.write(new Text(key), new IntWritable(max));

Output−最後に、異なる年齢グループの3つのコレクションのKey-Valueペアデータのセットを取得します。各年齢層の男性コレクションの最高給与と女性コレクションの最高給与がそれぞれ含まれています。

Map、Partitioner、およびReduceタスクを実行した後、Key-Valueペアデータの3つのコレクションは、出力として3つの異なるファイルに保存されます。

3つのタスクはすべてMapReduceジョブとして扱われます。これらのジョブの次の要件と仕様は、構成で指定する必要があります-

  • 職種名
  • キーと値の入力および出力形式
  • Map、Reduce、Partitionerタスクの個々のクラス
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

サンプルプログラム

次のプログラムは、MapReduceプログラムで特定の基準のパーティショナーを実装する方法を示しています。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

上記のコードを次のように保存します PartitionerExample.java「/ home / hadoop / hadoopPartitioner」にあります。プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/ home / hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

Step 1−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。あなたはmvnrepository.comからjarファイルをダウンロードすることができます。

ダウンロードしたフォルダーが「/ home / hadoop / hadoopPartitioner」であると仮定します。

Step 2 −プログラムのコンパイルには以下のコマンドを使用します PartitionerExample.java プログラムのjarファイルを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .

Step 3 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 −次のコマンドを使用して、という名前の入力ファイルをコピーします input.txt HDFSの入力ディレクトリにあります。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得して、トップ給与アプリケーションを実行します。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 −次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

プログラムで3つのパーティショナーと3つのレデューサーを使用しているため、出力は3つのファイルにあります。

Step 8 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

次のコマンドを使用して、の出力を確認します。 Part-00001 ファイル。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

次のコマンドを使用して、の出力を確認します。 Part-00002 ファイル。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000

コンバイナー、別名 semi-reducer, は、Mapクラスからの入力を受け入れ、その後、出力キーと値のペアをReducerクラスに渡すことによって動作するオプションのクラスです。

コンバイナーの主な機能は、同じキーでマップ出力レコードを要約することです。コンバイナーの出力(Key-Valueコレクション)は、ネットワークを介して実際のレデューサータスクに入力として送信されます。

コンバイナー

Combinerクラスは、MapクラスとReduceクラスの間で使用され、MapとReduceの間のデータ転送の量を減らします。通常、mapタスクの出力は大きく、reduceタスクに転送されるデータは高くなります。

次のMapReduceタスク図は、COMBINERPHASEを示しています。

コンバイナーはどのように機能しますか?

MapReduceコンバイナーがどのように機能するかについての簡単な要約は次のとおりです-

  • コンバイナには事前定義されたインターフェイスがなく、Reducerインターフェイスのreduce()メソッドを実装する必要があります。

  • コンバイナは、各マップ出力キーで動作します。これは、Reducerクラスと同じ出力Key-Valueタイプを持っている必要があります。

  • コンバイナーは、元のMap出力を置き換えるため、大きなデータセットから要約情報を生成できます。

ただし、Combinerはオプションですが、Reduceフェーズでデータを複数のグループに分離するのに役立ち、処理が容易になります。

MapReduceコンバイナーの実装

次の例は、コンバイナに関する理論的な考え方を示しています。次の名前の入力テキストファイルがあると仮定します。input.txt MapReduceの場合。

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Combinerを使用したMapReduceプログラムの重要なフェーズについて以下で説明します。

レコードリーダー

これはMapReduceの最初のフェーズであり、レコードリーダーは入力テキストファイルからすべての行をテキストとして読み取り、キーと値のペアとして出力を生成します。

Input −入力ファイルからの行ごとのテキスト。

Output−キーと値のペアを形成します。以下は、予想されるキーと値のペアのセットです。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

マップフェーズ

マップフェーズは、レコードリーダーから入力を受け取り、それを処理して、キーと値のペアの別のセットとして出力を生成します。

Input −次のキーと値のペアは、レコードリーダーから取得した入力です。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

マップフェーズでは、各キーと値のペアを読み取り、StringTokenizerを使用して各単語を値から分割し、各単語をキーとして扱い、その単語の数を値として扱います。次のコードスニペットは、Mapperクラスとmap関数を示しています。

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

Output −期待される出力は次のとおりです−

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

コンバイナーフェーズ

コンバイナーフェーズは、マップフェーズから各キーと値のペアを取得して処理し、次のように出力を生成します。 key-value collection ペア。

Input −次のキーと値のペアは、マップフェーズから取得した入力です。

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

コンバイナーフェーズでは、各キーと値のペアを読み取り、一般的な単語をキーとして結合し、値をコレクションとして結合します。通常、コンバイナーのコードと操作はレデューサーのコードと操作に似ています。以下は、Mapper、Combiner、Reducerクラス宣言のコードスニペットです。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

Output −期待される出力は次のとおりです−

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レデューサーフェーズ

レデューサーフェーズは、コンバイナーフェーズから各キーと値の収集ペアを取得して処理し、出力をキーと値のペアとして渡します。コンバイナの機能はレデューサと同じであることに注意してください。

Input −次のキーと値のペアは、コンバイナーフェーズから取得した入力です。

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レデューサーフェーズは、各キーと値のペアを読み取ります。以下は、コンバイナーのコードスニペットです。

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

Output −レデューサーフェーズからの期待される出力は次のとおりです。

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

レコードライター

これはMapReduceの最後のフェーズであり、レコードライターはReducerフェーズからすべてのキーと値のペアを書き込み、出力をテキストとして送信します。

Input −出力形式とともにReducerフェーズからの各Key-Valueペア。

Output−キーと値のペアをテキスト形式で提供します。以下は期待される出力です。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

サンプルプログラム

次のコードブロックは、プログラム内の単語数をカウントします。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

上記のプログラムを次のように保存します WordCount.java。プログラムのコンパイルと実行を以下に示します。

コンパイルと実行

Hadoopユーザーのホームディレクトリ(たとえば、/ home / hadoop)にいると仮定します。

上記のプログラムをコンパイルして実行するには、以下の手順に従ってください。

Step 1 −次のコマンドを使用して、コンパイルされたJavaクラスを格納するディレクトリを作成します。

$ mkdir units

Step 2−MapReduceプログラムのコンパイルと実行に使用されるHadoop-core-1.2.1.jarをダウンロードします。あなたはmvnrepository.comからjarファイルをダウンロードすることができます。

ダウンロードしたフォルダーが/ home / hadoop /であると仮定します。

Step 3 −次のコマンドを使用して、 WordCount.java プログラムとプログラムのjarファイルを作成します。

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

Step 4 −次のコマンドを使用して、HDFSに入力ディレクトリを作成します。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 −次のコマンドを使用して、という名前の入力ファイルをコピーします input.txt HDFSの入力ディレクトリにあります。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

Step 6 −次のコマンドを使用して、入力ディレクトリ内のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 −次のコマンドを使用して、入力ディレクトリから入力ファイルを取得してワードカウントアプリケーションを実行します。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

ファイルが実行されるまでしばらく待ちます。実行後、出力にはいくつかの入力分割、マップタスク、およびレデューサータスクが含まれます。

Step 8 −次のコマンドを使用して、出力フォルダー内の結果のファイルを確認します。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 −次のコマンドを使用して、の出力を確認します。 Part-00000ファイル。このファイルはHDFSによって生成されます。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下は、MapReduceプログラムによって生成された出力です。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

この章では、HDFSとMapReduceの両方の管理を含むHadoop管理について説明します。

  • HDFS管理には、HDFSファイルの構造、場所、および更新されたファイルの監視が含まれます。

  • MapReduceの管理には、アプリケーションのリスト、ノードの構成、アプリケーションのステータスなどの監視が含まれます。

HDFSモニタリング

HDFS(Hadoop分散ファイルシステム)には、ユーザーディレクトリ、入力ファイル、および出力ファイルが含まれています。MapReduceコマンドを使用します。put そして get, 保存および取得用。

「/ $ HADOOP_HOME / sbin」でコマンド「start-all.sh」を渡してHadoopフレームワーク(デーモン)を起動した後、次のURLをブラウザ「http:// localhost:50070」に渡します。ブラウザに次の画面が表示されます。

次のスクリーンショットは、ブラウズHDFSをブラウズする方法を示しています。

次のスクリーンショットは、HDFSのファイル構造を示しています。「/ user / hadoop」ディレクトリ内のファイルが表示されます。

次のスクリーンショットは、クラスター内のデータノード情報を示しています。ここでは、構成と容量を備えた1つのノードを見つけることができます。

MapReduceジョブモニタリング

MapReduceアプリケーションは、ジョブのコレクションです(Mapジョブ、Combiner、Partitioner、およびReduceジョブ)。以下を監視および維持することが必須です-

  • アプリケーションが適しているデータノードの構成。
  • アプリケーションごとに使用されるデータノードとリソースの数。

これらすべてを監視するには、ユーザーインターフェイスが必要です。「/ $ HADOOP_HOME / sbin」でコマンド「start-all.sh」を渡してHadoopフレームワークを起動した後、次のURLをブラウザ「http:// localhost:8080」に渡します。ブラウザに次の画面が表示されます。

上のスクリーンショットでは、ハンドポインターはアプリケーションID上にあります。それをクリックするだけで、ブラウザに次の画面が表示されます。それは次のことを説明しています-

  • 現在のアプリケーションが実行されているユーザー

  • アプリケーション名

  • そのアプリケーションの種類

  • 現在のステータス、最終ステータス

  • アプリケーションの開始時間、経過(完了時間)、監視時に完了した場合

  • このアプリケーションの履歴、つまりログ情報

  • そして最後に、ノード情報、つまりアプリケーションの実行に参加したノード。

次のスクリーンショットは、特定のアプリケーションの詳細を示しています-

次のスクリーンショットは、現在実行中のノード情報を示しています。ここでは、スクリーンショットにはノードが1つだけ含まれています。ハンドポインタは、実行中のノードのローカルホストアドレスを示します。