Hadoop - MapReduce
MapReduce là một khung sử dụng mà chúng ta có thể viết các ứng dụng để xử lý song song một lượng lớn dữ liệu trên các cụm phần cứng hàng hóa lớn một cách đáng tin cậy.
MapReduce là gì?
MapReduce là một kỹ thuật xử lý và một mô hình chương trình cho tính toán phân tán dựa trên java. Thuật toán MapReduce chứa hai nhiệm vụ quan trọng, đó là Bản đồ và Giảm. Bản đồ lấy một tập hợp dữ liệu và chuyển đổi thành một tập dữ liệu khác, trong đó các phần tử riêng lẻ được chia nhỏ thành các bộ giá trị (cặp khóa / giá trị). Thứ hai, giảm tác vụ, lấy đầu ra từ bản đồ làm đầu vào và kết hợp các bộ dữ liệu đó thành một bộ nhỏ hơn các bộ giá trị. Như trình tự của tên MapReduce ngụ ý, tác vụ thu gọn luôn được thực hiện sau tác vụ bản đồ.
Ưu điểm chính của MapReduce là dễ dàng mở rộng quy mô xử lý dữ liệu qua nhiều nút máy tính. Trong mô hình MapReduce, các nguyên tắc xử lý dữ liệu được gọi là các trình ánh xạ và trình giảm bớt. Việc phân rã một ứng dụng xử lý dữ liệu thành các trình lập bản đồ và trình giảm thiểu đôi khi không phải là chuyện nhỏ. Tuy nhiên, một khi chúng tôi viết một ứng dụng ở dạng MapReduce, việc mở rộng ứng dụng để chạy trên hàng trăm, hàng nghìn hoặc thậm chí hàng chục nghìn máy trong một cụm chỉ đơn thuần là thay đổi cấu hình. Khả năng mở rộng đơn giản này là điều đã thu hút nhiều lập trình viên sử dụng mô hình MapReduce.
Thuật toán
Nói chung, mô hình MapReduce dựa trên việc gửi máy tính đến nơi chứa dữ liệu!
Chương trình MapReduce thực hiện trong ba giai đoạn, đó là giai đoạn bản đồ, giai đoạn xáo trộn và giai đoạn giảm bớt.
Map stage- Công việc của map hay mapper là xử lý dữ liệu đầu vào. Nói chung, dữ liệu đầu vào ở dạng tệp hoặc thư mục và được lưu trữ trong hệ thống tệp Hadoop (HDFS). Tệp đầu vào được chuyển tới hàm ánh xạ từng dòng. Trình ánh xạ xử lý dữ liệu và tạo ra một số phần nhỏ dữ liệu.
Reduce stage - Giai đoạn này là sự kết hợp của Shuffle sân khấu và Reducesân khấu. Công việc của Reducer là xử lý dữ liệu đến từ trình ánh xạ. Sau khi xử lý, nó tạo ra một tập hợp đầu ra mới, sẽ được lưu trữ trong HDFS.
Trong một công việc MapReduce, Hadoop gửi các tác vụ Bản đồ và Rút gọn đến các máy chủ thích hợp trong cụm.
Khung quản lý tất cả các chi tiết của việc truyền dữ liệu như phát hành nhiệm vụ, xác minh việc hoàn thành nhiệm vụ và sao chép dữ liệu xung quanh cụm giữa các nút.
Hầu hết quá trình tính toán diễn ra trên các nút với dữ liệu trên đĩa cục bộ làm giảm lưu lượng mạng.
Sau khi hoàn thành các nhiệm vụ nhất định, cụm thu thập và giảm dữ liệu để tạo thành một kết quả thích hợp và gửi nó trở lại máy chủ Hadoop.
Đầu vào và đầu ra (Phối cảnh Java)
Khung công tác MapReduce hoạt động trên các cặp <key, value>, nghĩa là khung xem đầu vào cho công việc dưới dạng tập hợp các cặp <key, value> và tạo ra một tập hợp các cặp <key, value> là đầu ra của công việc , có thể hình dung các loại khác nhau.
Khóa và các lớp giá trị phải được khuôn khổ tuần tự hóa theo cách thức và do đó, cần phải triển khai giao diện ghi. Ngoài ra, các lớp khóa phải triển khai giao diện Có thể ghi được-Có thể so sánh để tạo điều kiện sắp xếp theo khung. Các loại đầu vào và đầu ra của mộtMapReduce job - (Đầu vào) <k1, v1> → ánh xạ → <k2, v2> → giảm → <k3, v3> (Đầu ra).
Đầu vào | Đầu ra | |
---|---|---|
Bản đồ | <k1, v1> | danh sách (<k2, v2>) |
Giảm | <k2, list (v2)> | danh sách (<k3, v3>) |
Thuật ngữ
PayLoad - Các ứng dụng triển khai Bản đồ và các chức năng Rút gọn, và tạo thành cốt lõi của công việc.
Mapper - Người lập bản đồ ánh xạ các cặp khóa / giá trị đầu vào thành một tập hợp các cặp khóa / giá trị trung gian.
NamedNode - Nút quản lý Hệ thống tệp phân tán Hadoop (HDFS).
DataNode - Nút nơi dữ liệu được trình bày trước trước khi bất kỳ quá trình xử lý nào diễn ra.
MasterNode - Nút nơi JobTracker chạy và chấp nhận các yêu cầu công việc từ khách hàng.
SlaveNode - Nút nơi chương trình Map và Reduce chạy.
JobTracker - Lên lịch công việc và theo dõi các công việc được giao vào Task tracker.
Task Tracker - Theo dõi nhiệm vụ và báo cáo trạng thái cho JobTracker.
Job - Một chương trình là sự thực thi của Mapper và Reducer trên một tập dữ liệu.
Task - Thực hiện một Mapper hoặc một Reducer trên một phần dữ liệu.
Task Attempt - Một trường hợp cụ thể của nỗ lực thực hiện một tác vụ trên SlaveNode.
Tình huống mẫu
Dưới đây là dữ liệu liên quan đến mức tiêu thụ điện của một tổ chức. Nó chứa mức tiêu thụ điện hàng tháng và mức trung bình hàng năm trong các năm khác nhau.
tháng một | Tháng hai | Mar | Tháng tư | có thể | Tháng sáu | Thg 7 | Tháng 8 | Tháng chín | Tháng 10 | Tháng 11 | Tháng mười hai | Trung bình | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Năm 1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Nếu dữ liệu trên được cung cấp làm đầu vào, chúng ta phải viết các ứng dụng để xử lý nó và đưa ra các kết quả như tìm năm sử dụng tối đa, năm sử dụng tối thiểu, v.v. Đây là bước đi dành cho các lập trình viên có số lượng bản ghi hữu hạn. Họ sẽ chỉ cần viết logic để tạo ra đầu ra cần thiết, và chuyển dữ liệu đến ứng dụng đã viết.
Tuy nhiên, hãy nghĩ đến dữ liệu đại diện cho mức tiêu thụ điện của tất cả các ngành công nghiệp quy mô lớn của một bang cụ thể, kể từ khi hình thành.
Khi chúng tôi viết ứng dụng để xử lý dữ liệu hàng loạt như vậy,
Họ sẽ mất rất nhiều thời gian để thực hiện.
Sẽ có một lưu lượng mạng lớn khi chúng ta di chuyển dữ liệu từ nguồn sang máy chủ mạng, v.v.
Để giải quyết những vấn đề này, chúng tôi có khung MapReduce.
Dữ liệu đầu vào
Dữ liệu trên được lưu dưới dạng sample.txtvà được cung cấp dưới dạng đầu vào. Tệp đầu vào trông như hình dưới đây.
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Chương trình mẫu
Dưới đây là chương trình cho dữ liệu mẫu sử dụng khung MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Lưu chương trình trên thành ProcessUnits.java. Việc biên dịch và thực hiện chương trình được giải thích dưới đây.
Biên soạn và Thực hiện Chương trình Đơn vị Quy trình
Giả sử chúng ta đang ở trong thư mục chính của người dùng Hadoop (ví dụ: / home / hadoop).
Làm theo các bước dưới đây để biên dịch và thực thi chương trình trên.
Bước 1
Lệnh sau là tạo một thư mục để lưu trữ các lớp java đã biên dịch.
$ mkdir units
Bước 2
Tải xuống Hadoop-core-1.2.1.jar,được sử dụng để biên dịch và thực thi chương trình MapReduce. Truy cập liên kết sau mvnrepository.com để tải về jar. Giả sử thư mục đã tải xuống là/home/hadoop/.
Bước 3
Các lệnh sau được sử dụng để biên dịch ProcessUnits.java chương trình và tạo một jar cho chương trình.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Bước 4
Lệnh sau được sử dụng để tạo thư mục đầu vào trong HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Bước 5
Lệnh sau được sử dụng để sao chép tệp đầu vào có tên sample.txttrong thư mục đầu vào của HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Bước 6
Lệnh sau được sử dụng để xác minh các tệp trong thư mục đầu vào.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Bước 7
Lệnh sau được sử dụng để chạy ứng dụng Eleunit_max bằng cách lấy các tệp đầu vào từ thư mục đầu vào.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Chờ một lúc cho đến khi tệp được thực thi. Sau khi thực thi, như hình dưới đây, đầu ra sẽ chứa số lượng đầu vào chia tách, số lượng tác vụ Bản đồ, số lượng tác vụ giảm tốc, v.v.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
Bước 8
Lệnh sau được sử dụng để xác minh các tệp kết quả trong thư mục đầu ra.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Bước 9
Lệnh sau được sử dụng để xem kết quả đầu ra trong Part-00000 tập tin. Tệp này được tạo bởi HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Dưới đây là kết quả do chương trình MapReduce tạo ra.
1981 34
1984 40
1985 45
Bước 10
Lệnh sau được sử dụng để sao chép thư mục đầu ra từ HDFS sang hệ thống tệp cục bộ để phân tích.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
Các lệnh quan trọng
Tất cả các lệnh Hadoop được gọi bởi $HADOOP_HOME/bin/hadoopchỉ huy. Chạy tập lệnh Hadoop mà không có bất kỳ đối số nào sẽ in mô tả cho tất cả các lệnh.
Usage - hadoop [--config confdir] COMMAND
Bảng sau liệt kê các tùy chọn có sẵn và mô tả của chúng.
Sr.No. | Tùy chọn & Mô tả |
---|---|
1 | namenode -format Định dạng hệ thống tệp DFS. |
2 | secondarynamenode Chạy nút tên phụ DFS. |
3 | namenode Chạy nút tên DFS. |
4 | datanode Chạy một nút dữ liệu DFS. |
5 | dfsadmin Chạy ứng dụng quản trị DFS. |
6 | mradmin Chạy ứng dụng quản trị Map-Reduce. |
7 | fsck Chạy tiện ích kiểm tra hệ thống tệp DFS. |
số 8 | fs Chạy một máy khách người dùng hệ thống tệp chung. |
9 | balancer Chạy tiện ích cân bằng cụm. |
10 | oiv Áp dụng trình xem fsimage ngoại tuyến cho fsimage. |
11 | fetchdt Tìm nạp mã thông báo ủy quyền từ NameNode. |
12 | jobtracker Chạy nút MapReduce job Tracker. |
13 | pipes Chạy công việc Pipes. |
14 | tasktracker Chạy một nút Trình theo dõi tác vụ MapReduce. |
15 | historyserver Chạy các máy chủ lịch sử công việc như một daemon độc lập. |
16 | job Thao tác các công việc MapReduce. |
17 | queue Nhận thông tin về JobQueues. |
18 | version In phiên bản. |
19 | jar <jar> Chạy một tệp jar. |
20 | distcp <srcurl> <desturl> Sao chép tệp hoặc thư mục một cách đệ quy. |
21 | distcp2 <srcurl> <desturl> Phiên bản DistCp 2. |
22 | archive -archiveName NAME -p <parent path> <src>* <dest> Tạo một kho lưu trữ hadoop. |
23 | classpath In đường dẫn lớp cần thiết để lấy Hadoop jar và các thư viện bắt buộc. |
24 | daemonlog Nhận / Đặt cấp độ nhật ký cho mỗi daemon |
Cách tương tác với MapReduce Jobs
Sử dụng - công việc hadoop [GENERIC_OPTIONS]
Sau đây là các Tùy chọn Chung có sẵn trong công việc Hadoop.
Sr.No. | GENERIC_OPTION & Mô tả |
---|---|
1 | -submit <job-file> Nộp công việc. |
2 | -status <job-id> In bản đồ và giảm tỷ lệ phần trăm hoàn thành và tất cả các quầy công việc. |
3 | -counter <job-id> <group-name> <countername> In giá trị bộ đếm. |
4 | -kill <job-id> Giết công việc. |
5 | -events <job-id> <fromevent-#> <#-of-events> In các chi tiết của sự kiện mà trình theo dõi công việc nhận được cho phạm vi nhất định. |
6 | -history [all] <jobOutputDir> - history < jobOutputDir> In chi tiết công việc, chi tiết mẹo bị lỗi và bị chết. Có thể xem thêm thông tin chi tiết về công việc như các nhiệm vụ thành công và các nỗ lực thực hiện cho từng nhiệm vụ bằng cách chỉ định tùy chọn [tất cả]. |
7 | -list[all] Hiển thị tất cả các công việc. -danh sách chỉ hiển thị các công việc chưa hoàn thành. |
số 8 | -kill-task <task-id> Giết nhiệm vụ. Các nhiệm vụ bị giết KHÔNG được tính vào các lần thử không thành công. |
9 | -fail-task <task-id> Không hoàn thành nhiệm vụ. Các nhiệm vụ không thành công được tính cho các lần không thành công. |
10 | -set-priority <job-id> <priority> Thay đổi mức độ ưu tiên của công việc. Các giá trị ưu tiên được phép là VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
Để xem tình trạng công việc
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
Để xem lịch sử công việc đầu ra-dir
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
Giết công việc
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004