MapReduce - Triển khai Hadoop

MapReduce là một khuôn khổ được sử dụng để viết các ứng dụng nhằm xử lý khối lượng dữ liệu khổng lồ trên các cụm phần cứng hàng hóa lớn một cách đáng tin cậy. Chương này sẽ đưa bạn qua hoạt động của MapReduce trong khuôn khổ Hadoop sử dụng Java.

Thuật toán MapReduce

Nói chung, mô hình MapReduce dựa trên việc gửi các chương trình thu nhỏ bản đồ đến các máy tính có dữ liệu thực.

  • Trong một công việc MapReduce, Hadoop gửi các tác vụ Bản đồ và Rút gọn tới các máy chủ thích hợp trong cụm.

  • Khung quản lý tất cả các chi tiết của việc truyền dữ liệu như phát hành nhiệm vụ, xác minh việc hoàn thành nhiệm vụ và sao chép dữ liệu xung quanh cụm giữa các nút.

  • Hầu hết việc tính toán diễn ra trên các nút với dữ liệu trên các đĩa cục bộ làm giảm lưu lượng mạng.

  • Sau khi hoàn thành một nhiệm vụ nhất định, cụm thu thập và giảm dữ liệu để tạo thành một kết quả thích hợp và gửi nó trở lại máy chủ Hadoop.

Đầu vào và đầu ra (Phối cảnh Java)

Khung công tác MapReduce hoạt động trên các cặp khóa-giá trị, nghĩa là, khung công tác xem đầu vào của công việc như một tập hợp các cặp khóa-giá trị và tạo ra một tập hợp các cặp khóa-giá trị làm đầu ra của công việc, có thể hình dung ra các loại khác nhau.

Các lớp giá trị và khóa phải có thể tuần tự hóa bởi khuôn khổ và do đó, nó được yêu cầu để triển khai giao diện ghi. Ngoài ra, các lớp quan trọng phải triển khai giao diện có thể so sánh được để tạo thuận lợi cho việc sắp xếp theo khung.

Cả định dạng đầu vào và đầu ra của công việc MapReduce đều ở dạng các cặp khóa-giá trị -

(Đầu vào) <k1, v1> -> bản đồ -> <k2, v2> -> giảm -> <k3, v3> (Đầu ra).

Đầu vào Đầu ra
Bản đồ <k1, v1> danh sách (<k2, v2>)
Giảm <k2, list (v2)> danh sách (<k3, v3>)

Triển khai MapReduce

Bảng sau đây cho thấy dữ liệu về mức tiêu thụ điện của một tổ chức. Bảng này bao gồm mức tiêu thụ điện hàng tháng và mức trung bình hàng năm trong năm năm liên tiếp.

tháng một Tháng hai Mar Tháng tư có thể Tháng sáu Thg 7 Tháng 8 Tháng chín Tháng 10 Tháng 11 Tháng mười hai Trung bình
Năm 1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Chúng ta cần viết các ứng dụng để xử lý dữ liệu đầu vào trong bảng đã cho để tìm ra năm sử dụng tối đa, năm sử dụng tối thiểu, v.v. Nhiệm vụ này rất dễ dàng đối với các lập trình viên với số lượng bản ghi hữu hạn, vì họ chỉ cần viết logic để tạo ra kết quả đầu ra cần thiết và chuyển dữ liệu đến ứng dụng đã viết.

Bây giờ chúng ta hãy nâng quy mô của dữ liệu đầu vào. Giả sử chúng ta phải phân tích mức tiêu thụ điện của tất cả các ngành công nghiệp quy mô lớn của một bang cụ thể. Khi chúng tôi viết ứng dụng để xử lý dữ liệu hàng loạt như vậy,

  • Họ sẽ mất rất nhiều thời gian để thực hiện.

  • Sẽ có nhiều lưu lượng mạng khi chúng ta di chuyển dữ liệu từ nguồn đến máy chủ mạng.

Để giải quyết những vấn đề này, chúng tôi có khung MapReduce.

Dữ liệu đầu vào

Dữ liệu trên được lưu dưới dạng sample.txtvà được cung cấp dưới dạng đầu vào. Tệp đầu vào trông như hình dưới đây.

Năm 1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Chương trình mẫu

Chương trình sau cho dữ liệu mẫu sử dụng khung MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Lưu chương trình trên vào ProcessUnits.java. Việc biên dịch và thực hiện chương trình được đưa ra dưới đây.

Biên dịch và thực hiện chương trình ProcessUnits

Giả sử chúng tôi đang ở trong thư mục chính của người dùng Hadoop (ví dụ: / home / hadoop).

Làm theo các bước dưới đây để biên dịch và thực thi chương trình trên.

Step 1 - Sử dụng lệnh sau để tạo thư mục lưu các lớp java đã biên dịch.

$ mkdir units

Step 2- Tải xuống Hadoop-core-1.2.1.jar, được sử dụng để biên dịch và thực thi chương trình MapReduce. Tải xuống jar từ mvnrepository.com . Giả sử thư mục tải xuống là / home / hadoop /.

Step 3 - Các lệnh sau được sử dụng để biên dịch ProcessUnits.java chương trình và để tạo một jar cho chương trình.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - Lệnh sau dùng để tạo thư mục đầu vào trong HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Lệnh sau dùng để sao chép tệp đầu vào có tên sample.txt trong thư mục đầu vào của HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - Lệnh sau được sử dụng để xác minh các tệp trong thư mục đầu vào

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Lệnh sau được sử dụng để chạy ứng dụng Eleunit_max bằng cách lấy các tệp đầu vào từ thư mục đầu vào.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Chờ một lúc cho đến khi tệp được thực thi. Sau khi thực thi, đầu ra chứa một số phân tách đầu vào, tác vụ Bản đồ, tác vụ Bộ giảm tốc, v.v.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - Lệnh sau được sử dụng để xác minh các tệp kết quả trong thư mục đầu ra.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Lệnh sau được sử dụng để xem kết quả đầu ra trong Part-00000tập tin. Tệp này được tạo bởi HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Sau đây là đầu ra do chương trình MapReduce tạo ra:

1981 34
1984 40
1985 45

Step 10 - Lệnh sau được sử dụng để sao chép thư mục đầu ra từ HDFS vào hệ thống tệp cục bộ.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop