MapReduce - Implementasi Hadoop
MapReduce adalah kerangka kerja yang digunakan untuk menulis aplikasi guna memproses data dalam jumlah besar pada kelompok besar perangkat keras komoditas dengan cara yang andal. Bab ini memandu Anda mempelajari pengoperasian MapReduce dalam kerangka kerja Hadoop menggunakan Java.
Algoritma MapReduce
Umumnya paradigma MapReduce didasarkan pada pengiriman program pengurangan peta ke komputer di mana data aktual berada.
Selama tugas MapReduce, Hadoop mengirim tugas Map dan Reduce ke server yang sesuai di cluster.
Kerangka kerja ini mengelola semua detail penyaluran data seperti mengeluarkan tugas, memverifikasi penyelesaian tugas, dan menyalin data di sekitar cluster di antara node.
Sebagian besar komputasi terjadi pada node dengan data pada disk lokal yang mengurangi lalu lintas jaringan.
Setelah menyelesaikan tugas tertentu, cluster mengumpulkan dan mengurangi data untuk membentuk hasil yang sesuai, dan mengirimkannya kembali ke server Hadoop.
Input dan Output (Perspektif Java)
Kerangka kerja MapReduce beroperasi pada pasangan nilai-kunci, yaitu kerangka kerja memandang masukan ke pekerjaan sebagai satu set pasangan nilai-kunci dan menghasilkan satu set pasangan nilai-kunci sebagai keluaran dari pekerjaan, yang dibayangkan dari jenis yang berbeda.
Kelas kunci dan nilai harus dapat diserialkan oleh kerangka kerja dan karenanya, diperlukan untuk mengimplementasikan antarmuka Writable. Selain itu, kelas kunci harus mengimplementasikan antarmuka WritableComparable untuk memfasilitasi pengurutan berdasarkan kerangka kerja.
Baik format input dan output dari pekerjaan MapReduce dalam bentuk pasangan nilai-kunci -
(Input) <k1, v1> -> map -> <k2, v2> -> kurangi -> <k3, v3> (Output).
Memasukkan | Keluaran | |
---|---|---|
Peta | <k1, v1> | daftar (<k2, v2>) |
Mengurangi | <k2, daftar (v2)> | daftar (<k3, v3>) |
Implementasi MapReduce
Tabel berikut menunjukkan data mengenai konsumsi listrik suatu organisasi. Tabel tersebut mencakup konsumsi listrik bulanan dan rata-rata tahunan selama lima tahun berturut-turut.
Jan | Feb | Merusak | Apr | Mungkin | Jun | Jul | Agustus | Sep | Okt | Nov | Des | Rata-rata | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Kita perlu menulis aplikasi untuk memproses data masukan dalam tabel yang diberikan untuk menemukan tahun penggunaan maksimum, tahun penggunaan minimum, dan seterusnya. Tugas ini mudah bagi programmer dengan jumlah record yang terbatas, karena mereka hanya akan menulis logika untuk menghasilkan keluaran yang diperlukan, dan meneruskan data ke aplikasi tertulis.
Sekarang mari kita naikkan skala data masukan. Asumsikan kita harus menganalisis konsumsi listrik dari semua industri skala besar di negara bagian tertentu. Saat kami menulis aplikasi untuk memproses data massal tersebut,
Mereka akan membutuhkan banyak waktu untuk dieksekusi.
Akan ada lalu lintas jaringan yang padat ketika kita memindahkan data dari sumber ke server jaringan.
Untuk mengatasi masalah ini, kami memiliki kerangka kerja MapReduce.
Memasukan data
Data di atas disimpan sebagai sample.txtdan diberikan sebagai masukan. File input terlihat seperti yang ditunjukkan di bawah ini.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Contoh Program
Program berikut untuk data sampel menggunakan kerangka kerja MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Simpan program di atas ke dalam ProcessUnits.java. Kompilasi dan eksekusi program diberikan di bawah ini.
Kompilasi dan Eksekusi Program ProcessUnits
Mari kita asumsikan bahwa kita berada di direktori home dari pengguna Hadoop (misalnya / home / hadoop).
Ikuti langkah-langkah yang diberikan di bawah ini untuk mengkompilasi dan menjalankan program di atas.
Step 1 - Gunakan perintah berikut untuk membuat direktori untuk menyimpan kelas java yang dikompilasi.
$ mkdir units
Step 2- Unduh Hadoop-core-1.2.1.jar, yang digunakan untuk mengkompilasi dan menjalankan program MapReduce. Unduh jar dari mvnrepository.com . Mari kita asumsikan folder unduhan adalah / home / hadoop /.
Step 3 - Perintah berikut digunakan untuk mengkompilasi file ProcessUnits.java program dan membuat toples untuk program tersebut.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - Perintah berikut digunakan untuk membuat direktori input di HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Perintah berikut digunakan untuk menyalin file input bernama sample.txt di direktori input HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - Perintah berikut digunakan untuk memverifikasi file di direktori input
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Perintah berikut digunakan untuk menjalankan aplikasi Eleunit_max dengan mengambil file input dari direktori input.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Tunggu beberapa saat hingga file dieksekusi. Setelah dieksekusi, output berisi sejumlah input split, tugas Peta, tugas Peredam, dll.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - Perintah berikut digunakan untuk memverifikasi file yang dihasilkan di folder keluaran.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Perintah berikut digunakan untuk melihat keluaran dalam Part-00000mengajukan. File ini dibuat oleh HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Berikut adalah output yang dihasilkan oleh program MapReduce -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - Perintah berikut digunakan untuk menyalin folder output dari HDFS ke sistem file lokal.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop