MapReduce-Hadoop 구현

MapReduce는 대규모 상용 하드웨어 클러스터에있는 대용량 데이터를 안정적인 방식으로 처리하는 애플리케이션을 작성하는 데 사용되는 프레임 워크입니다. 이 장에서는 Java를 사용하여 Hadoop 프레임 워크에서 MapReduce를 작동하는 방법에 대해 설명합니다.

MapReduce 알고리즘

일반적으로 MapReduce 패러다임은 실제 데이터가있는 컴퓨터로 맵 축소 프로그램을 보내는 것을 기반으로합니다.

  • MapReduce 작업 중에 Hadoop은 Map 및 Reduce 작업을 클러스터의 적절한 서버로 보냅니다.

  • 프레임 워크는 작업 실행, 작업 완료 확인 및 노드 간 클러스터 주변의 데이터 복사와 같은 데이터 전달의 모든 세부 정보를 관리합니다.

  • 대부분의 컴퓨팅은 네트워크 트래픽을 줄이는 로컬 디스크의 데이터와 함께 노드에서 발생합니다.

  • 주어진 작업을 완료 한 후 클러스터는 적절한 결과를 형성하기 위해 데이터를 수집 및 축소하고 다시 Hadoop 서버로 보냅니다.

입력 및 출력 (Java Perspective)

MapReduce 프레임 워크는 키-값 쌍에서 작동합니다. 즉, 프레임 워크는 작업에 대한 입력을 키-값 쌍의 집합으로보고 다른 유형의 작업 출력으로 키-값 쌍 집합을 생성합니다.

키 및 값 클래스는 프레임 워크에서 직렬화 할 수 있어야하므로 쓰기 가능한 인터페이스를 구현해야합니다. 또한 키 클래스는 프레임 워크 별 정렬을 용이하게하기 위해 WritableComparable 인터페이스를 구현해야합니다.

MapReduce 작업의 입력 및 출력 형식은 모두 키-값 쌍의 형식입니다.

(입력) <k1, v1>-> map-> <k2, v2>-> reduce-> <k3, v3> (출력).

입력 산출
지도 <k1, v1> 목록 (<k2, v2>)
줄이다 <k2, 목록 (v2)> 목록 (<k3, v3>)

MapReduce 구현

다음 표는 조직의 전력 소비에 관한 데이터를 보여줍니다. 이 표에는 5 년 연속 월간 전력 소비량과 연간 평균이 포함되어 있습니다.

1 월 2 월 망치다 4 월 할 수있다 6 월 7 월 8 월 9 월 10 월 11 월 12 월 평균
1979 년 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 년 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 년 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 년 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 년 38 39 39 39 39 41 41 41 00 40 39 39 45

최대 사용 연도, 최소 사용 연도 등을 찾기 위해 주어진 테이블의 입력 데이터를 처리하는 응용 프로그램을 작성해야합니다. 이 작업은 필요한 출력을 생성하는 논리를 작성하고 작성된 응용 프로그램에 데이터를 전달하기 때문에 제한된 양의 레코드를 가진 프로그래머에게 쉽습니다.

이제 입력 데이터의 규모를 높여 보겠습니다. 특정주의 모든 대규모 산업의 전력 소비를 분석해야한다고 가정합니다. 이러한 대량 데이터를 처리하기 위해 애플리케이션을 작성할 때

  • 실행하는 데 많은 시간이 걸립니다.

  • 소스에서 네트워크 서버로 데이터를 이동할 때 네트워크 트래픽이 많이 발생합니다.

이러한 문제를 해결하기 위해 MapReduce 프레임 워크가 있습니다.

입력 데이터

위의 데이터는 다음과 같이 저장됩니다. sample.txt입력으로 제공됩니다. 입력 파일은 아래와 같습니다.

1979 년 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 년 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 년 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 년 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 년 38 39 39 39 39 41 41 41 00 40 39 39 45

예제 프로그램

샘플 데이터에 대한 다음 프로그램은 MapReduce 프레임 워크를 사용합니다.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

위의 프로그램을 ProcessUnits.java. 프로그램의 컴파일 및 실행은 다음과 같습니다.

ProcessUnits 프로그램의 컴파일 및 실행

Hadoop 사용자의 홈 디렉토리 (예 : / home / hadoop)에 있다고 가정 해 보겠습니다.

위의 프로그램을 컴파일하고 실행하려면 아래 단계를 따르십시오.

Step 1 − 컴파일 된 자바 클래스를 저장할 디렉토리를 생성하려면 다음 명령을 사용하십시오.

$ mkdir units

Step 2− MapReduce 프로그램을 컴파일하고 실행하는 데 사용되는 Hadoop-core-1.2.1.jar를 다운로드합니다. mvnrepository.com 에서 jar를 다운로드합니다 . 다운로드 폴더가 / home / hadoop /이라고 가정하겠습니다.

Step 3 − 다음 명령을 사용하여 ProcessUnits.java 프로그램을 만들고 프로그램을위한 jar를 만듭니다.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 − 다음 명령은 HDFS에서 입력 디렉토리를 생성하는 데 사용됩니다.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 − 다음 명령은 이름이 지정된 입력 파일을 복사하는 데 사용됩니다. sample.txt HDFS의 입력 디렉토리에 있습니다.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 − 다음 명령은 입력 디렉토리의 파일을 확인하는 데 사용됩니다.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 − 다음 명령은 입력 디렉토리에서 입력 파일을 가져와 Eleunit_max 응용 프로그램을 실행하는 데 사용됩니다.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

파일이 실행될 때까지 잠시 기다리십시오. 실행 후 출력에는 여러 입력 분할, Map 작업, Reducer 작업 등이 포함됩니다.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 − 다음 명령은 출력 폴더에서 결과 파일을 확인하는 데 사용됩니다.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 − 다음 명령은 출력을 확인하는 데 사용됩니다. Part-00000파일. 이 파일은 HDFS에서 생성됩니다.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

다음은 MapReduce 프로그램에 의해 생성 된 출력입니다.

1981 년 34
1984 년 40
1985 년 45

Step 10 − 다음 명령은 HDFS에서 로컬 파일 시스템으로 출력 폴더를 복사하는 데 사용됩니다.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop