Hadoop - MapReduce

MapReduce adalah kerangka kerja yang digunakan untuk menulis aplikasi untuk memproses data dalam jumlah besar, secara paralel, pada kelompok besar perangkat keras komoditas dengan cara yang andal.

Apa itu MapReduce?

MapReduce adalah teknik pemrosesan dan model program untuk komputasi terdistribusi berbasis java. Algoritma MapReduce berisi dua tugas penting, yaitu Map dan Reduce. Map mengambil satu set data dan mengubahnya menjadi set data lain, di mana elemen individual dipecah menjadi tupel (pasangan kunci / nilai). Kedua, kurangi tugas, yang mengambil keluaran dari peta sebagai masukan dan menggabungkan tupel data tersebut menjadi satu set tupel yang lebih kecil. Seperti yang tersirat dalam urutan nama MapReduce, tugas pengurangan selalu dilakukan setelah tugas peta.

Keuntungan utama MapReduce adalah kemudahan untuk mengukur pemrosesan data melalui beberapa node komputasi. Di bawah model MapReduce, primitif pemrosesan data disebut pembuat peta dan pereduksi. Menguraikan aplikasi pemrosesan data menjadi pembuat peta dan pereduksi terkadang tidak sepele. Tapi, begitu kita menulis aplikasi dalam bentuk MapReduce, penskalaan aplikasi untuk menjalankan ratusan, ribuan, atau bahkan puluhan ribu mesin dalam sebuah cluster hanyalah perubahan konfigurasi. Skalabilitas sederhana inilah yang telah menarik banyak programmer untuk menggunakan model MapReduce.

Algoritma

  • Umumnya paradigma MapReduce didasarkan pada pengiriman komputer ke tempat data berada!

  • Program MapReduce dijalankan dalam tiga tahap, yaitu tahap peta, tahap shuffle, dan tahap pengurangan.

    • Map stage- Tugas map atau mapper adalah mengolah data masukan. Umumnya data masukan berupa file atau direktori dan disimpan dalam file system Hadoop (HDFS). File masukan diteruskan ke fungsi mapper baris demi baris. Pemeta memproses data dan membuat beberapa potongan kecil data.

    • Reduce stage - Tahap ini adalah kombinasi dari Shuffle panggung dan Reducetahap. Tugas Reducer adalah memproses data yang berasal dari mapper. Setelah diproses, ini menghasilkan satu set output baru, yang akan disimpan di HDFS.

  • Selama tugas MapReduce, Hadoop mengirim tugas Map dan Reduce ke server yang sesuai di cluster.

  • Kerangka kerja mengelola semua detail penyaluran data seperti mengeluarkan tugas, memverifikasi penyelesaian tugas, dan menyalin data di sekitar cluster di antara node.

  • Sebagian besar komputasi terjadi pada node dengan data pada disk lokal yang mengurangi lalu lintas jaringan.

  • Setelah menyelesaikan tugas yang diberikan, cluster mengumpulkan dan mengurangi data untuk membentuk hasil yang sesuai, dan mengirimkannya kembali ke server Hadoop.

Input dan Output (Perspektif Java)

Kerangka kerja MapReduce beroperasi pada pasangan <kunci, nilai>, yaitu, kerangka kerja melihat masukan ke pekerjaan sebagai satu set pasangan <kunci, nilai> dan menghasilkan satu set pasangan <kunci, nilai> sebagai keluaran dari pekerjaan. , mungkin dari berbagai jenis.

Kelas kunci dan nilai harus berseri oleh kerangka kerja dan karenanya, perlu mengimplementasikan antarmuka Writable. Selain itu, kelas kunci harus mengimplementasikan antarmuka Writable-Comparable untuk memfasilitasi pengurutan berdasarkan kerangka kerja. Jenis Input dan Output aMapReduce job - (Masukan) <k1, v1> → peta → <k2, v2> → kurangi → <k3, v3> (Output).

Memasukkan Keluaran
Peta <k1, v1> daftar (<k2, v2>)
Mengurangi <k2, daftar (v2)> daftar (<k3, v3>)

Terminologi

  • PayLoad - Aplikasi menerapkan fungsi Peta dan Kurangi, dan membentuk inti pekerjaan.

  • Mapper - Pemeta memetakan pasangan kunci / nilai masukan ke satu set pasangan kunci / nilai antara.

  • NamedNode - Node yang mengelola Hadoop Distributed File System (HDFS).

  • DataNode - Node tempat data disajikan terlebih dahulu sebelum pemrosesan apa pun dilakukan.

  • MasterNode - Node tempat JobTracker berjalan dan yang menerima permintaan pekerjaan dari klien.

  • SlaveNode - Node tempat program Map and Reduce berjalan.

  • JobTracker - Menjadwalkan pekerjaan dan melacak tugas yang ditugaskan ke pelacak Tugas.

  • Task Tracker - Melacak tugas dan melaporkan status ke JobTracker.

  • Job - Program adalah eksekusi Mapper dan Reducer di seluruh dataset.

  • Task - Eksekusi Pemeta atau Peredam pada sepotong data.

  • Task Attempt - Contoh tertentu dari upaya untuk menjalankan tugas di SlaveNode.

Contoh Skenario

Diberikan di bawah ini adalah data mengenai konsumsi listrik suatu organisasi. Ini berisi konsumsi listrik bulanan dan rata-rata tahunan selama beberapa tahun.

Jan Feb Merusak Apr Mungkin Jun Jul Agustus Sep Okt Nov Des Rata-rata
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Apabila data di atas diberikan sebagai masukan, maka kita harus menulis aplikasi untuk mengolahnya dan menghasilkan hasil seperti mencari tahun pemakaian maksimal, tahun pemakaian minimal, dan lain sebagainya. Ini adalah panduan bagi para programmer dengan jumlah record yang terbatas. Mereka hanya akan menulis logika untuk menghasilkan output yang diperlukan, dan meneruskan data ke aplikasi yang ditulis.

Tapi, pikirkan data yang mewakili konsumsi listrik dari semua industri skala besar di negara bagian tertentu, sejak pembentukannya.

Saat kami menulis aplikasi untuk memproses data massal tersebut,

  • Mereka akan membutuhkan banyak waktu untuk dieksekusi.

  • Akan ada lalu lintas jaringan yang padat ketika kita memindahkan data dari sumber ke server jaringan dan seterusnya.

Untuk mengatasi masalah ini, kami memiliki kerangka kerja MapReduce.

Memasukan data

Data di atas disimpan sebagai sample.txtdan diberikan sebagai masukan. File input terlihat seperti yang ditunjukkan di bawah ini.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Contoh Program

Diberikan di bawah ini adalah program untuk data sampel menggunakan kerangka kerja MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

Simpan program di atas sebagai ProcessUnits.java. Kompilasi dan pelaksanaan program dijelaskan di bawah ini.

Kompilasi dan Pelaksanaan Program Unit Proses

Mari kita asumsikan bahwa kita berada di direktori home dari pengguna Hadoop (misalnya / home / hadoop).

Ikuti langkah-langkah yang diberikan di bawah ini untuk mengkompilasi dan menjalankan program di atas.

Langkah 1

Perintah berikut adalah untuk membuat direktori untuk menyimpan kelas java yang dikompilasi.

$ mkdir units

Langkah 2

Unduh Hadoop-core-1.2.1.jar,yang digunakan untuk mengkompilasi dan menjalankan program MapReduce. Kunjungi tautan berikut mvnrepository.com untuk mengunduh jar. Mari kita asumsikan folder yang diunduh adalah/home/hadoop/.

LANGKAH 3

Perintah berikut digunakan untuk mengompilasi file ProcessUnits.java program dan membuat toples untuk program tersebut.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

LANGKAH 4

Perintah berikut digunakan untuk membuat direktori input di HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

LANGKAH 5

Perintah berikut digunakan untuk menyalin file input bernama sample.txtdi direktori input HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

LANGKAH 6

Perintah berikut digunakan untuk memverifikasi file di direktori input.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Langkah 7

Perintah berikut digunakan untuk menjalankan aplikasi Eleunit_max dengan mengambil file input dari direktori input.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Tunggu beberapa saat hingga file tersebut dijalankan. Setelah eksekusi, seperti yang ditunjukkan di bawah ini, output akan berisi jumlah input split, jumlah tugas Map, jumlah tugas peredam, dll.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

LANGKAH 8

Perintah berikut digunakan untuk memverifikasi file yang dihasilkan di folder keluaran.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

LANGKAH 9

Perintah berikut digunakan untuk melihat keluaran dalam Part-00000 mengajukan. File ini dibuat oleh HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Di bawah ini adalah output yang dihasilkan oleh program MapReduce.

1981    34 
1984    40 
1985    45

LANGKAH 10

Perintah berikut digunakan untuk menyalin folder keluaran dari HDFS ke sistem file lokal untuk dianalisis.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Perintah Penting

Semua perintah Hadoop dipanggil oleh $HADOOP_HOME/bin/hadoopperintah. Menjalankan skrip Hadoop tanpa argumen akan mencetak deskripsi untuk semua perintah.

Usage - PERINTAH hadoop [--config confdir]

Tabel berikut mencantumkan opsi yang tersedia dan deskripsinya.

Sr.No. Opsi & Deskripsi
1

namenode -format

Memformat sistem file DFS.

2

secondarynamenode

Menjalankan kode nama sekunder DFS.

3

namenode

Menjalankan kode nama DFS.

4

datanode

Menjalankan datanode DFS.

5

dfsadmin

Menjalankan klien admin DFS.

6

mradmin

Menjalankan klien admin Map-Reduce.

7

fsck

Menjalankan utilitas pemeriksaan sistem file DFS.

8

fs

Menjalankan klien pengguna sistem file generik.

9

balancer

Menjalankan utilitas keseimbangan cluster.

10

oiv

Menerapkan fsimage viewer offline ke fsimage.

11

fetchdt

Mengambil token delegasi dari NameNode.

12

jobtracker

Menjalankan node Pelacak pekerjaan MapReduce.

13

pipes

Menjalankan pekerjaan Pipes.

14

tasktracker

Menjalankan node Tracker tugas MapReduce.

15

historyserver

Menjalankan server riwayat pekerjaan sebagai daemon mandiri.

16

job

Memanipulasi pekerjaan MapReduce.

17

queue

Mendapatkan informasi tentang JobQueues.

18

version

Mencetak versinya.

19

jar <jar>

Menjalankan file jar.

20

distcp <srcurl> <desturl>

Menyalin file atau direktori secara rekursif.

21

distcp2 <srcurl> <desturl>

DistCp versi 2.

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Membuat arsip hadoop.

23

classpath

Mencetak jalur kelas yang diperlukan untuk mendapatkan Hadoop jar dan perpustakaan yang diperlukan.

24

daemonlog

Dapatkan / Setel level log untuk setiap daemon

Bagaimana Berinteraksi dengan Pekerjaan MapReduce

Penggunaan - pekerjaan hadoop [GENERIC_OPTIONS]

Berikut ini adalah Opsi Generik yang tersedia dalam pekerjaan Hadoop.

Sr.No. GENERIC_OPTION & Deskripsi
1

-submit <job-file>

Mengirimkan pekerjaan.

2

-status <job-id>

Mencetak peta dan mengurangi persentase penyelesaian dan semua penghitung pekerjaan.

3

-counter <job-id> <group-name> <countername>

Mencetak nilai penghitung.

4

-kill <job-id>

Membunuh pekerjaan itu.

5

-events <job-id> <fromevent-#> <#-of-events>

Mencetak detail peristiwa yang diterima oleh jobtracker untuk rentang tertentu.

6

-history [all] <jobOutputDir> - history < jobOutputDir>

Mencetak detail pekerjaan, detail tip gagal dan mati. Rincian lebih lanjut tentang pekerjaan seperti tugas yang berhasil dan upaya tugas yang dilakukan untuk setiap tugas dapat dilihat dengan menentukan opsi [semua].

7

-list[all]

Menampilkan semua pekerjaan. -list hanya menampilkan pekerjaan yang belum selesai.

8

-kill-task <task-id>

Hentikan tugas itu. Tugas yang dihentikan TIDAK dihitung sebagai upaya yang gagal.

9

-fail-task <task-id>

Gagal menjalankan tugas. Tugas yang gagal dihitung dari upaya yang gagal.

10

-set-priority <job-id> <priority>

Mengubah prioritas pekerjaan. Nilai prioritas yang diizinkan adalah VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

Untuk melihat status pekerjaan

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

Untuk melihat riwayat pekerjaan output-dir

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

Untuk menghentikan pekerjaan itu

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004