MapReduce - การใช้งาน Hadoop
MapReduce เป็นเฟรมเวิร์กที่ใช้สำหรับการเขียนแอพพลิเคชั่นเพื่อประมวลผลข้อมูลจำนวนมหาศาลบนกลุ่มฮาร์ดแวร์สินค้าขนาดใหญ่ในลักษณะที่เชื่อถือได้ บทนี้จะนำคุณไปสู่การทำงานของ MapReduce ใน Hadoop framework โดยใช้ Java
อัลกอริทึม MapReduce
โดยทั่วไปกระบวนทัศน์ MapReduce จะขึ้นอยู่กับการส่งโปรแกรมลดแผนที่ไปยังคอมพิวเตอร์ที่มีข้อมูลจริงอยู่
ระหว่างงาน MapReduce Hadoop จะส่งแผนที่และงานลดไปยังเซิร์ฟเวอร์ที่เหมาะสมในคลัสเตอร์
เฟรมเวิร์กจัดการรายละเอียดทั้งหมดของการส่งผ่านข้อมูลเช่นการออกงานการตรวจสอบความสมบูรณ์ของงานและการคัดลอกข้อมูลรอบคลัสเตอร์ระหว่างโหนด
การประมวลผลส่วนใหญ่เกิดขึ้นบนโหนดที่มีข้อมูลบนดิสก์ภายในซึ่งช่วยลดปริมาณการใช้งานเครือข่าย
หลังจากเสร็จสิ้นภารกิจที่กำหนดคลัสเตอร์จะรวบรวมและลดข้อมูลเพื่อสร้างผลลัพธ์ที่เหมาะสมและส่งกลับไปยังเซิร์ฟเวอร์ Hadoop
อินพุตและเอาต์พุต (Java Perspective)
เฟรมเวิร์ก MapReduce ทำงานบนคู่คีย์ - ค่ากล่าวคือเฟรมเวิร์กดูอินพุตของงานเป็นชุดของคู่คีย์ - ค่าและสร้างชุดของคู่คีย์ - ค่าเป็นเอาต์พุตของงานซึ่งอาจเป็นประเภทต่างๆ
คลาสคีย์และค่าจะต้องทำให้เป็นอนุกรมได้ตามกรอบงานและด้วยเหตุนี้จึงจำเป็นต้องใช้อินเทอร์เฟซที่เขียนได้ นอกจากนี้คลาสหลักต้องใช้อินเทอร์เฟซ WritableComparable เพื่ออำนวยความสะดวกในการจัดเรียงตามกรอบงาน
ทั้งรูปแบบอินพุตและเอาต์พุตของงาน MapReduce อยู่ในรูปแบบของคู่คีย์ - ค่า -
(อินพุต) <k1, v1> -> แผนที่ -> <k2, v2> -> ลด -> <k3, v3> (เอาต์พุต)
อินพุต | เอาต์พุต | |
---|---|---|
แผนที่ | <k1, v1> | รายการ (<k2, v2>) |
ลด | <k2 รายการ (v2)> | รายการ (<k3, v3>) |
การใช้ MapReduce
ตารางต่อไปนี้แสดงข้อมูลเกี่ยวกับการใช้ไฟฟ้าขององค์กร ตารางประกอบด้วยปริมาณการใช้ไฟฟ้ารายเดือนและค่าเฉลี่ยรายปีเป็นเวลาห้าปีติดต่อกัน
ม.ค. | ก.พ. | มี.ค. | เม.ย. | อาจ | มิ.ย. | ก.ค. | ส.ค. | ก.ย. | ต.ค. | พ.ย. | ธ.ค. | ค่าเฉลี่ย | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
พ.ศ. 2522 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
พ.ศ. 2523 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
พ.ศ. 2524 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
พ.ศ. 2527 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
พ.ศ. 2528 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
เราจำเป็นต้องเขียนแอปพลิเคชันเพื่อประมวลผลข้อมูลอินพุตในตารางที่กำหนดเพื่อค้นหาปีที่ใช้งานสูงสุดปีของการใช้งานขั้นต่ำและอื่น ๆ งานนี้เป็นเรื่องง่ายสำหรับโปรแกรมเมอร์ที่มีจำนวนบันทึก จำกัด เนื่องจากพวกเขาจะเขียนตรรกะเพื่อสร้างผลลัพธ์ที่ต้องการและส่งข้อมูลไปยังแอปพลิเคชันที่เขียนขึ้น
ตอนนี้ให้เราเพิ่มขนาดของข้อมูลอินพุต สมมติว่าเราต้องวิเคราะห์การใช้ไฟฟ้าของอุตสาหกรรมขนาดใหญ่ทั้งหมดของรัฐใดรัฐหนึ่ง เมื่อเราเขียนแอปพลิเคชันเพื่อประมวลผลข้อมูลจำนวนมากดังกล่าว
พวกเขาจะใช้เวลามากในการดำเนินการ
จะมีปริมาณการใช้งานเครือข่ายหนาแน่นเมื่อเราย้ายข้อมูลจากต้นทางไปยังเซิร์ฟเวอร์เครือข่าย
ในการแก้ปัญหาเหล่านี้เรามีกรอบ MapReduce
ป้อนข้อมูล
ข้อมูลข้างต้นถูกบันทึกเป็น sample.txtและกำหนดให้เป็นอินพุต ไฟล์อินพุตมีลักษณะดังที่แสดงด้านล่าง
พ.ศ. 2522 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
พ.ศ. 2523 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
พ.ศ. 2524 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
พ.ศ. 2527 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
พ.ศ. 2528 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
ตัวอย่างโปรแกรม
โปรแกรมต่อไปนี้สำหรับข้อมูลตัวอย่างใช้กรอบ MapReduce
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
บันทึกโปรแกรมข้างต้นลงใน ProcessUnits.java. การรวบรวมและการดำเนินการของโปรแกรมมีดังต่อไปนี้
การคอมไพล์และการดำเนินการของโปรแกรม ProcessUnits
สมมติว่าเราอยู่ในโฮมไดเร็กทอรีของผู้ใช้ Hadoop (เช่น / home / hadoop)
ทำตามขั้นตอนด้านล่างเพื่อคอมไพล์และรันโปรแกรมข้างต้น
Step 1 - ใช้คำสั่งต่อไปนี้เพื่อสร้างไดเร็กทอรีเพื่อจัดเก็บคลาส java ที่คอมไพล์
$ mkdir units
Step 2- ดาวน์โหลด Hadoop-core-1.2.1.jar ซึ่งใช้ในการคอมไพล์และรันโปรแกรม MapReduce ดาวน์โหลดขวดจากmvnrepository.com ให้เราถือว่าโฟลเดอร์ดาวน์โหลดคือ / home / hadoop /
Step 3 - คำสั่งต่อไปนี้ใช้เพื่อรวบรวมไฟล์ ProcessUnits.java โปรแกรมและสร้าง jar สำหรับโปรแกรม
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - คำสั่งต่อไปนี้ใช้เพื่อสร้างไดเร็กทอรีอินพุตใน HDFS
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - คำสั่งต่อไปนี้ใช้เพื่อคัดลอกไฟล์อินพุตที่ชื่อ sample.txt ในไดเร็กทอรีอินพุตของ HDFS
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - คำสั่งต่อไปนี้ใช้เพื่อตรวจสอบไฟล์ในไดเร็กทอรีอินพุต
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - คำสั่งต่อไปนี้ใช้เพื่อรันแอปพลิเคชัน Eleunit_max โดยรับไฟล์อินพุตจากไดเร็กทอรีอินพุต
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
รอสักครู่จนกว่าไฟล์จะถูกเรียกใช้งาน หลังจากดำเนินการแล้วผลลัพธ์จะมีการแยกอินพุตจำนวนหนึ่งงานแผนที่งานลดและอื่น ๆ
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - คำสั่งต่อไปนี้ใช้เพื่อตรวจสอบไฟล์ผลลัพธ์ในโฟลเดอร์ผลลัพธ์
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - คำสั่งต่อไปนี้ใช้เพื่อดูผลลัพธ์ใน Part-00000ไฟล์. ไฟล์นี้สร้างโดย HDFS
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
ต่อไปนี้เป็นผลลัพธ์ที่สร้างโดยโปรแกรม MapReduce -
พ.ศ. 2524 | 34 |
พ.ศ. 2527 | 40 |
พ.ศ. 2528 | 45 |
Step 10 - คำสั่งต่อไปนี้ใช้เพื่อคัดลอกโฟลเดอร์ผลลัพธ์จาก HDFS ไปยังระบบไฟล์ภายในเครื่อง
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop