MapReduce - การใช้งาน Hadoop

MapReduce เป็นเฟรมเวิร์กที่ใช้สำหรับการเขียนแอพพลิเคชั่นเพื่อประมวลผลข้อมูลจำนวนมหาศาลบนกลุ่มฮาร์ดแวร์สินค้าขนาดใหญ่ในลักษณะที่เชื่อถือได้ บทนี้จะนำคุณไปสู่การทำงานของ MapReduce ใน Hadoop framework โดยใช้ Java

อัลกอริทึม MapReduce

โดยทั่วไปกระบวนทัศน์ MapReduce จะขึ้นอยู่กับการส่งโปรแกรมลดแผนที่ไปยังคอมพิวเตอร์ที่มีข้อมูลจริงอยู่

  • ระหว่างงาน MapReduce Hadoop จะส่งแผนที่และงานลดไปยังเซิร์ฟเวอร์ที่เหมาะสมในคลัสเตอร์

  • เฟรมเวิร์กจัดการรายละเอียดทั้งหมดของการส่งผ่านข้อมูลเช่นการออกงานการตรวจสอบความสมบูรณ์ของงานและการคัดลอกข้อมูลรอบคลัสเตอร์ระหว่างโหนด

  • การประมวลผลส่วนใหญ่เกิดขึ้นบนโหนดที่มีข้อมูลบนดิสก์ภายในซึ่งช่วยลดปริมาณการใช้งานเครือข่าย

  • หลังจากเสร็จสิ้นภารกิจที่กำหนดคลัสเตอร์จะรวบรวมและลดข้อมูลเพื่อสร้างผลลัพธ์ที่เหมาะสมและส่งกลับไปยังเซิร์ฟเวอร์ Hadoop

อินพุตและเอาต์พุต (Java Perspective)

เฟรมเวิร์ก MapReduce ทำงานบนคู่คีย์ - ค่ากล่าวคือเฟรมเวิร์กดูอินพุตของงานเป็นชุดของคู่คีย์ - ค่าและสร้างชุดของคู่คีย์ - ค่าเป็นเอาต์พุตของงานซึ่งอาจเป็นประเภทต่างๆ

คลาสคีย์และค่าจะต้องทำให้เป็นอนุกรมได้ตามกรอบงานและด้วยเหตุนี้จึงจำเป็นต้องใช้อินเทอร์เฟซที่เขียนได้ นอกจากนี้คลาสหลักต้องใช้อินเทอร์เฟซ WritableComparable เพื่ออำนวยความสะดวกในการจัดเรียงตามกรอบงาน

ทั้งรูปแบบอินพุตและเอาต์พุตของงาน MapReduce อยู่ในรูปแบบของคู่คีย์ - ค่า -

(อินพุต) <k1, v1> -> แผนที่ -> <k2, v2> -> ลด -> <k3, v3> (เอาต์พุต)

อินพุต เอาต์พุต
แผนที่ <k1, v1> รายการ (<k2, v2>)
ลด <k2 รายการ (v2)> รายการ (<k3, v3>)

การใช้ MapReduce

ตารางต่อไปนี้แสดงข้อมูลเกี่ยวกับการใช้ไฟฟ้าขององค์กร ตารางประกอบด้วยปริมาณการใช้ไฟฟ้ารายเดือนและค่าเฉลี่ยรายปีเป็นเวลาห้าปีติดต่อกัน

ม.ค. ก.พ. มี.ค. เม.ย. อาจ มิ.ย. ก.ค. ส.ค. ก.ย. ต.ค. พ.ย. ธ.ค. ค่าเฉลี่ย
พ.ศ. 2522 23 23 2 43 24 25 26 26 26 26 25 26 25
พ.ศ. 2523 26 27 28 28 28 30 31 31 31 30 30 30 29
พ.ศ. 2524 31 32 32 32 33 34 35 36 36 34 34 34 34
พ.ศ. 2527 39 38 39 39 39 41 42 43 40 39 38 38 40
พ.ศ. 2528 38 39 39 39 39 41 41 41 00 40 39 39 45

เราจำเป็นต้องเขียนแอปพลิเคชันเพื่อประมวลผลข้อมูลอินพุตในตารางที่กำหนดเพื่อค้นหาปีที่ใช้งานสูงสุดปีของการใช้งานขั้นต่ำและอื่น ๆ งานนี้เป็นเรื่องง่ายสำหรับโปรแกรมเมอร์ที่มีจำนวนบันทึก จำกัด เนื่องจากพวกเขาจะเขียนตรรกะเพื่อสร้างผลลัพธ์ที่ต้องการและส่งข้อมูลไปยังแอปพลิเคชันที่เขียนขึ้น

ตอนนี้ให้เราเพิ่มขนาดของข้อมูลอินพุต สมมติว่าเราต้องวิเคราะห์การใช้ไฟฟ้าของอุตสาหกรรมขนาดใหญ่ทั้งหมดของรัฐใดรัฐหนึ่ง เมื่อเราเขียนแอปพลิเคชันเพื่อประมวลผลข้อมูลจำนวนมากดังกล่าว

  • พวกเขาจะใช้เวลามากในการดำเนินการ

  • จะมีปริมาณการใช้งานเครือข่ายหนาแน่นเมื่อเราย้ายข้อมูลจากต้นทางไปยังเซิร์ฟเวอร์เครือข่าย

ในการแก้ปัญหาเหล่านี้เรามีกรอบ MapReduce

ป้อนข้อมูล

ข้อมูลข้างต้นถูกบันทึกเป็น sample.txtและกำหนดให้เป็นอินพุต ไฟล์อินพุตมีลักษณะดังที่แสดงด้านล่าง

พ.ศ. 2522 23 23 2 43 24 25 26 26 26 26 25 26 25
พ.ศ. 2523 26 27 28 28 28 30 31 31 31 30 30 30 29
พ.ศ. 2524 31 32 32 32 33 34 35 36 36 34 34 34 34
พ.ศ. 2527 39 38 39 39 39 41 42 43 40 39 38 38 40
พ.ศ. 2528 38 39 39 39 39 41 41 41 00 40 39 39 45

ตัวอย่างโปรแกรม

โปรแกรมต่อไปนี้สำหรับข้อมูลตัวอย่างใช้กรอบ MapReduce

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

บันทึกโปรแกรมข้างต้นลงใน ProcessUnits.java. การรวบรวมและการดำเนินการของโปรแกรมมีดังต่อไปนี้

การคอมไพล์และการดำเนินการของโปรแกรม ProcessUnits

สมมติว่าเราอยู่ในโฮมไดเร็กทอรีของผู้ใช้ Hadoop (เช่น / home / hadoop)

ทำตามขั้นตอนด้านล่างเพื่อคอมไพล์และรันโปรแกรมข้างต้น

Step 1 - ใช้คำสั่งต่อไปนี้เพื่อสร้างไดเร็กทอรีเพื่อจัดเก็บคลาส java ที่คอมไพล์

$ mkdir units

Step 2- ดาวน์โหลด Hadoop-core-1.2.1.jar ซึ่งใช้ในการคอมไพล์และรันโปรแกรม MapReduce ดาวน์โหลดขวดจากmvnrepository.com ให้เราถือว่าโฟลเดอร์ดาวน์โหลดคือ / home / hadoop /

Step 3 - คำสั่งต่อไปนี้ใช้เพื่อรวบรวมไฟล์ ProcessUnits.java โปรแกรมและสร้าง jar สำหรับโปรแกรม

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - คำสั่งต่อไปนี้ใช้เพื่อสร้างไดเร็กทอรีอินพุตใน HDFS

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - คำสั่งต่อไปนี้ใช้เพื่อคัดลอกไฟล์อินพุตที่ชื่อ sample.txt ในไดเร็กทอรีอินพุตของ HDFS

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - คำสั่งต่อไปนี้ใช้เพื่อตรวจสอบไฟล์ในไดเร็กทอรีอินพุต

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - คำสั่งต่อไปนี้ใช้เพื่อรันแอปพลิเคชัน Eleunit_max โดยรับไฟล์อินพุตจากไดเร็กทอรีอินพุต

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

รอสักครู่จนกว่าไฟล์จะถูกเรียกใช้งาน หลังจากดำเนินการแล้วผลลัพธ์จะมีการแยกอินพุตจำนวนหนึ่งงานแผนที่งานลดและอื่น ๆ

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - คำสั่งต่อไปนี้ใช้เพื่อตรวจสอบไฟล์ผลลัพธ์ในโฟลเดอร์ผลลัพธ์

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - คำสั่งต่อไปนี้ใช้เพื่อดูผลลัพธ์ใน Part-00000ไฟล์. ไฟล์นี้สร้างโดย HDFS

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

ต่อไปนี้เป็นผลลัพธ์ที่สร้างโดยโปรแกรม MapReduce -

พ.ศ. 2524 34
พ.ศ. 2527 40
พ.ศ. 2528 45

Step 10 - คำสั่งต่อไปนี้ใช้เพื่อคัดลอกโฟลเดอร์ผลลัพธ์จาก HDFS ไปยังระบบไฟล์ภายในเครื่อง

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop