MapReduce - Panduan Cepat
MapReduce adalah model pemrograman untuk menulis aplikasi yang dapat memproses Big Data secara paralel pada banyak node. MapReduce menyediakan kemampuan analitis untuk menganalisis data kompleks dalam jumlah besar.
Apa itu Big Data?
Big Data adalah kumpulan kumpulan data besar yang tidak dapat diproses menggunakan teknik komputasi tradisional. Misalnya, volume data yang dibutuhkan Facebook atau Youtube untuk dikumpulkan dan dikelola setiap hari, dapat termasuk dalam kategori Big Data. Namun, Big Data tidak hanya tentang skala dan volume, tetapi juga melibatkan satu atau lebih aspek berikut - Kecepatan, Variasi, Volume, dan Kompleksitas.
Mengapa MapReduce?
Sistem Perusahaan Tradisional biasanya memiliki server terpusat untuk menyimpan dan memproses data. Ilustrasi berikut menggambarkan pandangan skematis dari sistem perusahaan tradisional. Model tradisional tentunya tidak cocok untuk memproses volume besar data yang dapat diskalakan dan tidak dapat diakomodasi oleh server database standar. Selain itu, sistem terpusat menciptakan terlalu banyak hambatan saat memproses banyak file secara bersamaan.
Google memecahkan masalah kemacetan ini menggunakan algoritma yang disebut MapReduce. MapReduce membagi tugas menjadi bagian-bagian kecil dan menetapkannya ke banyak komputer. Nantinya, hasil dikumpulkan di satu tempat dan diintegrasikan untuk membentuk dataset hasil.
Bagaimana MapReduce Bekerja?
Algoritma MapReduce berisi dua tugas penting, yaitu Map dan Reduce.
Tugas Peta mengambil sekumpulan data dan mengubahnya menjadi kumpulan data lain, di mana elemen individual dipecah menjadi tupel (pasangan nilai-kunci).
Tugas Reduce mengambil output dari Map sebagai input dan menggabungkan tupel data tersebut (key-value pair) menjadi sekumpulan tupel yang lebih kecil.
Tugas pengurangan selalu dilakukan setelah pekerjaan peta.
Sekarang, mari kita cermati masing-masing fase dan coba pahami signifikansinya.
Input Phase - Di sini kita memiliki Pembaca Rekaman yang menerjemahkan setiap catatan dalam file masukan dan mengirimkan data yang diuraikan ke pembuat peta dalam bentuk pasangan nilai kunci.
Map - Peta adalah fungsi yang ditentukan pengguna, yang mengambil serangkaian pasangan nilai kunci dan memprosesnya masing-masing untuk menghasilkan nol atau lebih pasangan nilai kunci.
Intermediate Keys - Pasangan kunci-nilai yang dihasilkan oleh mapper dikenal sebagai kunci perantara.
Combiner- Penggabung adalah jenis Peredam lokal yang mengelompokkan data serupa dari fase peta ke dalam set yang dapat diidentifikasi. Ini mengambil kunci perantara dari mapper sebagai input dan menerapkan kode yang ditentukan pengguna untuk menggabungkan nilai dalam lingkup kecil satu mapper. Ini bukan bagian dari algoritma MapReduce utama; itu opsional.
Shuffle and Sort- Tugas Peredam dimulai dengan langkah Acak dan Urutkan. Ini mengunduh pasangan nilai kunci yang dikelompokkan ke mesin lokal, tempat Peredam berjalan. Pasangan nilai kunci individual diurutkan berdasarkan kunci ke dalam daftar data yang lebih besar. Daftar data mengelompokkan kunci yang setara sehingga nilainya dapat diiterasi dengan mudah dalam tugas Reducer.
Reducer- Peredam mengambil data pasangan nilai kunci yang dikelompokkan sebagai input dan menjalankan fungsi Peredam pada masing-masing data. Di sini, data dapat dikumpulkan, difilter, dan digabungkan dalam beberapa cara, dan memerlukan berbagai pemrosesan. Setelah eksekusi selesai, ini memberikan nol atau lebih pasangan nilai kunci ke langkah terakhir.
Output Phase - Dalam fase keluaran, kita memiliki pemformat keluaran yang menerjemahkan pasangan nilai kunci akhir dari fungsi Reducer dan menuliskannya ke dalam file menggunakan penulis rekaman.
Mari kita coba memahami dua tugas Map & f Reduce dengan bantuan diagram kecil -
MapReduce-Example
Mari kita ambil contoh dunia nyata untuk memahami kekuatan MapReduce. Twitter menerima sekitar 500 juta kicauan per hari, yang berarti hampir 3.000 kicauan per detik. Ilustrasi berikut menunjukkan bagaimana Tweeter mengelola tweet-nya dengan bantuan MapReduce.
Seperti yang diperlihatkan dalam ilustrasi, algoritma MapReduce melakukan tindakan berikut -
Tokenize - Tokenize tweet ke dalam peta token dan menulisnya sebagai pasangan nilai kunci.
Filter - Memfilter kata-kata yang tidak diinginkan dari peta token dan menulis peta yang difilter sebagai pasangan nilai kunci.
Count - Menghasilkan penghitung token per kata.
Aggregate Counters - Mempersiapkan agregat nilai penghitung serupa ke dalam unit-unit kecil yang dapat dikelola.
Algoritma MapReduce berisi dua tugas penting, yaitu Map dan Reduce.
- Tugas peta dilakukan dengan menggunakan Mapper Class
- Tugas pengurangan dilakukan dengan menggunakan Kelas Peredam.
Kelas mapper mengambil masukan, membuat token, memetakan, dan mengurutkannya. Output dari kelas Mapper digunakan sebagai input oleh kelas Reducer, yang pada gilirannya mencari pasangan yang cocok dan menguranginya.
MapReduce menerapkan berbagai algoritma matematika untuk membagi tugas menjadi bagian-bagian kecil dan menetapkannya ke banyak sistem. Dalam istilah teknis, algoritma MapReduce membantu dalam mengirimkan tugas Map & Reduce ke server yang sesuai dalam sebuah cluster.
Algoritma matematika ini mungkin termasuk yang berikut -
- Sorting
- Searching
- Indexing
- TF-IDF
Penyortiran
Pengurutan adalah salah satu algoritma MapReduce dasar untuk memproses dan menganalisis data. MapReduce menerapkan algoritme pengurutan untuk secara otomatis mengurutkan pasangan nilai kunci keluaran dari mapper menurut kuncinya.
Metode pengurutan diimplementasikan di kelas mapper itu sendiri.
Dalam fase Acak dan Urutkan, setelah memberi token pada nilai di kelas mapper, file Context class (kelas yang ditentukan pengguna) mengumpulkan kunci bernilai yang cocok sebagai koleksi.
Untuk mengumpulkan pasangan nilai kunci yang serupa (kunci perantara), kelas Mapper membutuhkan bantuan RawComparator kelas untuk mengurutkan pasangan nilai kunci.
Kumpulan pasangan nilai kunci perantara untuk Peredam tertentu diurutkan secara otomatis oleh Hadoop untuk membentuk nilai kunci (K2, {V2, V2,…}) sebelum disajikan ke Peredam.
Mencari
Pencarian memainkan peran penting dalam algoritma MapReduce. Ini membantu dalam fase penggabung (opsional) dan dalam fase Peredam. Mari kita coba memahami cara kerja Penelusuran dengan bantuan sebuah contoh.
Contoh
Contoh berikut menunjukkan bagaimana MapReduce menggunakan algoritma Pencarian untuk mengetahui detail karyawan yang memperoleh gaji tertinggi dalam kumpulan data karyawan tertentu.
Mari kita asumsikan kita memiliki data karyawan dalam empat file berbeda - A, B, C, dan D. Mari kita asumsikan juga ada catatan karyawan duplikat di keempat file karena mengimpor data karyawan dari semua tabel database berulang kali. Lihat ilustrasi berikut.
The Map phasememproses setiap file masukan dan memberikan data karyawan dalam pasangan nilai kunci (<k, v>: <nama emp, gaji>). Lihat ilustrasi berikut.
The combiner phase(teknik pencarian) akan menerima masukan dari fase Peta sebagai pasangan nilai kunci dengan nama dan gaji karyawan. Dengan menggunakan teknik pencarian, penggabung akan memeriksa semua gaji karyawan untuk menemukan karyawan dengan gaji tertinggi di setiap file. Lihat cuplikan berikut.
<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary
if(v(second employee).salary > Max){
Max = v(salary);
}
else{
Continue checking;
}
Hasil yang diharapkan adalah sebagai berikut -
|
Reducer phase- Bentuk setiap file, Anda akan menemukan karyawan bergaji tertinggi. Untuk menghindari redundansi, periksa semua pasangan <k, v> dan hilangkan entri duplikat, jika ada. Algoritma yang sama digunakan di antara empat pasangan <k, v>, yang berasal dari empat file masukan. Hasil akhirnya harus sebagai berikut -
<gopal, 50000>
Pengindeksan
Biasanya pengindeksan digunakan untuk menunjuk ke data tertentu dan alamatnya. Ia melakukan pengindeksan batch pada file input untuk Mapper tertentu.
Teknik pengindeksan yang biasanya digunakan di MapReduce dikenal sebagai inverted index.Mesin pencari seperti Google dan Bing menggunakan teknik pengindeksan terbalik. Mari kita coba memahami cara kerja Pengindeksan dengan bantuan contoh sederhana.
Contoh
Teks berikut adalah masukan untuk pengindeksan terbalik. Di sini T [0], T [1], dan t [2] adalah nama file dan isinya dalam tanda kutip ganda.
T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"
Setelah menerapkan algoritma Pengindeksan, kami mendapatkan keluaran berikut -
"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}
Di sini "a": {2} menyiratkan istilah "a" muncul di file T [2]. Demikian pula, "adalah": {0, 1, 2} menyiratkan istilah "adalah" muncul di file T [0], T [1], dan T [2].
TF-IDF
TF-IDF adalah algoritma pengolah teks yang merupakan kependekan dari Term Frequency - Inverse Document Frequency. Ini adalah salah satu algoritma analisis web yang umum. Di sini, istilah 'frekuensi' mengacu pada berapa kali sebuah istilah muncul dalam dokumen.
Frekuensi Jangka (TF)
Ini mengukur seberapa sering istilah tertentu muncul dalam dokumen. Ini dihitung dengan berapa kali kata muncul dalam dokumen dibagi dengan jumlah kata dalam dokumen itu.
TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)
Frekuensi Dokumen Terbalik (IDF)
Ini mengukur pentingnya suatu istilah. Ini dihitung dengan jumlah dokumen dalam database teks dibagi dengan jumlah dokumen di mana istilah tertentu muncul.
Saat menghitung TF, semua istilah dianggap sama pentingnya. Artinya, TF menghitung frekuensi istilah untuk kata-kata normal seperti "adalah", "a", "apa", dll. Jadi, kita perlu mengetahui istilah yang sering digunakan saat meningkatkan yang jarang, dengan menghitung yang berikut -
IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).
Algoritme dijelaskan di bawah ini dengan bantuan contoh kecil.
Contoh
Pertimbangkan dokumen yang berisi 1000 kata, dimana kata tersebut hivemuncul 50 kali. TF untukhive kemudian (50/1000) = 0,05.
Sekarang, anggap kita memiliki 10 juta dokumen dan kata hivemuncul di 1000 ini. Kemudian, IDF dihitung sebagai log (10.000.000 / 1.000) = 4.
Bobot TF-IDF adalah hasil kali dari jumlah ini - 0,05 × 4 = 0,20.
MapReduce hanya berfungsi pada sistem operasi rasa Linux dan dilengkapi dengan Kerangka Hadoop. Kita perlu melakukan langkah-langkah berikut untuk menginstal framework Hadoop.
Memverifikasi Instalasi JAVA
Java harus diinstal di sistem Anda sebelum menginstal Hadoop. Gunakan perintah berikut untuk memeriksa apakah Anda telah menginstal Java di sistem Anda.
$ java –version
Jika Java sudah diinstal di sistem Anda, Anda akan melihat respons berikut -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
Jika Anda tidak menginstal Java di sistem Anda, ikuti langkah-langkah yang diberikan di bawah ini.
Menginstal Java
Langkah 1
Unduh versi terbaru Java dari tautan berikut - tautan ini .
Setelah mengunduh, Anda dapat menemukan file tersebut jdk-7u71-linux-x64.tar.gz di folder Unduhan Anda.
Langkah 2
Gunakan perintah berikut untuk mengekstrak konten jdk-7u71-linux-x64.gz.
$ cd Downloads/
$ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz
LANGKAH 3
Agar Java tersedia untuk semua pengguna, Anda harus memindahkannya ke lokasi "/ usr / local /". Pergi ke root dan ketik perintah berikut -
$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit
LANGKAH 4
Untuk menyiapkan variabel PATH dan JAVA_HOME, tambahkan perintah berikut ke file ~ / .bashrc.
export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin
Terapkan semua perubahan ke sistem yang sedang berjalan.
$ source ~/.bashrc
LANGKAH 5
Gunakan perintah berikut untuk mengkonfigurasi alternatif Java -
# alternatives --install /usr/bin/java java usr/local/java/bin/java 2
# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2
# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2
# alternatives --set java usr/local/java/bin/java
# alternatives --set javac usr/local/java/bin/javac
# alternatives --set jar usr/local/java/bin/jar
Sekarang verifikasi penginstalan menggunakan perintah java -version dari terminal.
Memverifikasi Instalasi Hadoop
Hadoop harus diinstal di sistem Anda sebelum menginstal MapReduce. Mari kita verifikasi instalasi Hadoop menggunakan perintah berikut -
$ hadoop version
Jika Hadoop sudah terinstal di sistem Anda, maka Anda akan mendapatkan respons berikut -
Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4
Jika Hadoop tidak diinstal pada sistem Anda, lanjutkan dengan langkah-langkah berikut.
Mendownload Hadoop
Unduh Hadoop 2.4.1 dari Apache Software Foundation dan ekstrak kontennya menggunakan perintah berikut.
$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit
Menginstal Hadoop dalam mode Pseudo Distributed
Langkah-langkah berikut digunakan untuk menginstal Hadoop 2.4.1 dalam mode pseudo didistribusikan.
Langkah 1 - Menyiapkan Hadoop
Anda dapat menyetel variabel lingkungan Hadoop dengan menambahkan perintah berikut ke file ~ / .bashrc.
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
Terapkan semua perubahan ke sistem yang sedang berjalan.
$ source ~/.bashrc
Langkah 2 - Konfigurasi Hadoop
Anda dapat menemukan semua file konfigurasi Hadoop di lokasi "$ HADOOP_HOME / etc / hadoop". Anda perlu membuat perubahan yang sesuai pada file konfigurasi tersebut sesuai dengan infrastruktur Hadoop Anda.
$ cd $HADOOP_HOME/etc/hadoop
Untuk mengembangkan program Hadoop menggunakan Java, Anda harus mengatur ulang variabel lingkungan Java di hadoop-env.sh file dengan mengganti nilai JAVA_HOME dengan lokasi Java di sistem Anda.
export JAVA_HOME=/usr/local/java
Anda harus mengedit file berikut untuk mengkonfigurasi Hadoop -
- core-site.xml
- hdfs-site.xml
- yarn-site.xml
- mapred-site.xml
core-site.xml
core-site.xml berisi informasi berikut−
- Nomor port yang digunakan untuk instance Hadoop
- Memori dialokasikan untuk sistem file
- Batas memori untuk menyimpan data
- Ukuran buffer Baca / Tulis
Buka core-site.xml dan tambahkan properti berikut di antara tag <configuration> dan </configuration>.
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000 </value>
</property>
</configuration>
hdfs-site.xml
hdfs-site.xml berisi informasi berikut -
- Nilai data replikasi
- Jalur namenode
- Jalur datanode sistem file lokal Anda (tempat Anda ingin menyimpan infra Hadoop)
Mari kita asumsikan data berikut.
dfs.replication (data replication value) = 1
(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode
(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode
Buka file ini dan tambahkan properti berikut di antara tag <configuration>, </configuration>.
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
</property>
</configuration>
Note - Dalam file di atas, semua nilai properti ditentukan pengguna dan Anda dapat membuat perubahan sesuai dengan infrastruktur Hadoop Anda.
benang-situs.xml
File ini digunakan untuk mengkonfigurasi benang menjadi Hadoop. Buka file yarn-site.xml dan tambahkan properti berikut di antara tag <configuration>, </configuration>.
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
mapred-site.xml
File ini digunakan untuk menentukan kerangka MapReduce yang kita gunakan. Secara default, Hadoop berisi template benang-situs.xml. Pertama-tama, Anda perlu menyalin file dari mapred-site.xml.template ke file mapred-site.xml menggunakan perintah berikut.
$ cp mapred-site.xml.template mapred-site.xml
Buka file mapred-site.xml dan tambahkan properti berikut di antara tag <configuration>, </configuration>.
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
Memverifikasi Instalasi Hadoop
Langkah-langkah berikut digunakan untuk memverifikasi penginstalan Hadoop.
Langkah 1 - Penyiapan Node Nama
Siapkan namenode menggunakan perintah “hdfs namenode -format” sebagai berikut -
$ cd ~ $ hdfs namenode -format
Hasil yang diharapkan adalah sebagai berikut -
10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/
Langkah 2 - Memverifikasi Hadoop dfs
Jalankan perintah berikut untuk memulai sistem file Hadoop Anda.
$ start-dfs.sh
Output yang diharapkan adalah sebagai berikut -
10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]
Langkah 3 - Memverifikasi Skrip Benang
Perintah berikut digunakan untuk memulai skrip benang. Menjalankan perintah ini akan memulai benang daemon Anda.
$ start-yarn.sh
Output yang diharapkan adalah sebagai berikut -
starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out
Langkah 4 - Mengakses Hadoop di Browser
Nomor port default untuk mengakses Hadoop adalah 50070. Gunakan URL berikut untuk mendapatkan layanan Hadoop di browser Anda.
http://localhost:50070/
Tangkapan layar berikut menunjukkan browser Hadoop.
Langkah 5 - Verifikasi semua Aplikasi Cluster
Nomor port default untuk mengakses semua aplikasi cluster adalah 8088. Gunakan URL berikut untuk menggunakan layanan ini.
http://localhost:8088/
Tangkapan layar berikut menunjukkan browser cluster Hadoop.
Dalam bab ini, kita akan melihat lebih dekat pada kelas dan metodenya yang terlibat dalam operasi pemrograman MapReduce. Kami terutama akan tetap fokus pada hal-hal berikut -
- Antarmuka Konteks Pekerjaan
- Kelas Pekerjaan
- Kelas Mapper
- Kelas Peredam
Antarmuka Konteks Pekerjaan
Antarmuka JobContext adalah antarmuka super untuk semua kelas, yang mendefinisikan pekerjaan berbeda di MapReduce. Ini memberi Anda tampilan baca-saja dari pekerjaan yang disediakan untuk tugas saat mereka sedang berjalan.
Berikut ini adalah sub-antarmuka dari antarmuka JobContext.
S.No. | Deskripsi Subinterface |
---|---|
1. | MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Mendefinisikan konteks yang diberikan ke Mapper. |
2. | ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Mendefinisikan konteks yang diteruskan ke Reducer. |
Kelas pekerjaan adalah kelas utama yang mengimplementasikan antarmuka JobContext.
Kelas Pekerjaan
Kelas Pekerjaan adalah kelas paling penting dalam API MapReduce. Ini memungkinkan pengguna untuk mengonfigurasi pekerjaan, mengirimkannya, mengontrol eksekusinya, dan menanyakan status. Metode yang ditetapkan hanya berfungsi hingga tugas dikirim, setelah itu metode tersebut akan menampilkan IllegalStateException.
Biasanya, pengguna membuat aplikasi, menjelaskan berbagai aspek pekerjaan, lalu mengirimkan pekerjaan dan memantau kemajuannya.
Berikut adalah contoh cara mengirimkan pekerjaan -
// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);
// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
Konstruktor
Berikut adalah ringkasan konstruktor dari kelas Job.
S.No | Ringkasan Konstruktor |
---|---|
1 | Job() |
2 | Job(Konfigurasi konfigurasi) |
3 | Job(Konfigurasi conf, String jobName) |
Metode
Beberapa metode penting dari kelas Job adalah sebagai berikut -
S.No | Deskripsi Metode |
---|---|
1 | getJobName() Nama pekerjaan yang ditentukan pengguna. |
2 | getJobState() Mengembalikan status Pekerjaan saat ini. |
3 | isComplete() Memeriksa apakah pekerjaan telah selesai atau belum. |
4 | setInputFormatClass() Setel InputFormat untuk pekerjaan itu. |
5 | setJobName(String name) Menyetel nama pekerjaan yang ditentukan pengguna. |
6 | setOutputFormatClass() Mengatur Format Output untuk pekerjaan itu. |
7 | setMapperClass(Class) Mengatur Mapper untuk pekerjaan itu. |
8 | setReducerClass(Class) Mengatur Peredam untuk pekerjaan itu. |
9 | setPartitionerClass(Class) Mengatur Partisi untuk pekerjaan itu. |
10 | setCombinerClass(Class) Mengatur Pemadu untuk pekerjaan itu. |
Kelas Mapper
Kelas Mapper mendefinisikan pekerjaan Peta. Memetakan pasangan nilai kunci input ke satu set pasangan nilai kunci menengah. Peta adalah tugas individu yang mengubah catatan masukan menjadi catatan perantara. Rekaman perantara yang ditransformasikan tidak harus memiliki jenis yang sama dengan rekaman masukan. Pasangan masukan tertentu dapat dipetakan ke nol atau banyak pasangan keluaran.
metode
mapadalah metode paling menonjol dari kelas Mapper. Sintaksnya didefinisikan di bawah -
map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)
Metode ini dipanggil sekali untuk setiap pasangan nilai kunci dalam pemisahan input.
Kelas Peredam
Kelas Reducer mendefinisikan pekerjaan Reduce di MapReduce. Ini mengurangi sekumpulan nilai antara yang berbagi kunci menjadi sekumpulan nilai yang lebih kecil. Implementasi peredam bisa mengakses Konfigurasi untuk suatu pekerjaan melalui metode JobContext.getConfiguration (). Peredam memiliki tiga fase utama - Acak, Urutkan, dan Kurangi.
Shuffle - Reducer menyalin keluaran yang diurutkan dari setiap Pemeta menggunakan HTTP di seluruh jaringan.
Sort- Kerangka kerja menggabungkan-mengurutkan input Peredam berdasarkan kunci (karena Pemetaan berbeda mungkin memiliki keluaran kunci yang sama). Fase shuffle dan sortir terjadi secara bersamaan, yaitu saat output diambil, mereka digabungkan.
Reduce - Dalam fase ini metode reduce (Object, Iterable, Context) dipanggil untuk setiap <key, (kumpulan nilai)> dalam input yang diurutkan.
metode
reduceadalah metode paling menonjol dari kelas Reducer. Sintaksnya didefinisikan di bawah -
reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)
Metode ini dipanggil sekali untuk setiap kunci pada kumpulan pasangan kunci-nilai.
MapReduce adalah kerangka kerja yang digunakan untuk menulis aplikasi guna memproses data dalam jumlah besar pada kelompok besar perangkat keras komoditas dengan cara yang andal. Bab ini memandu Anda mempelajari pengoperasian MapReduce dalam kerangka kerja Hadoop menggunakan Java.
Algoritma MapReduce
Umumnya paradigma MapReduce didasarkan pada pengiriman program pengurangan peta ke komputer di mana data aktual berada.
Selama tugas MapReduce, Hadoop mengirim tugas Map dan Reduce ke server yang sesuai di cluster.
Kerangka kerja ini mengelola semua detail penyaluran data seperti mengeluarkan tugas, memverifikasi penyelesaian tugas, dan menyalin data di sekitar cluster di antara node.
Sebagian besar komputasi terjadi pada node dengan data pada disk lokal yang mengurangi lalu lintas jaringan.
Setelah menyelesaikan tugas tertentu, cluster mengumpulkan dan mengurangi data untuk membentuk hasil yang sesuai, dan mengirimkannya kembali ke server Hadoop.
Input dan Output (Perspektif Java)
Kerangka kerja MapReduce beroperasi pada pasangan nilai-kunci, yaitu kerangka kerja memandang masukan ke pekerjaan sebagai satu set pasangan nilai-kunci dan menghasilkan satu set pasangan nilai-kunci sebagai keluaran dari pekerjaan, yang mungkin dari jenis yang berbeda.
Kelas kunci dan nilai harus dapat diserialkan oleh kerangka kerja dan karenanya, diperlukan untuk mengimplementasikan antarmuka Writable. Selain itu, kelas kunci harus mengimplementasikan antarmuka WritableComparable untuk memfasilitasi pengurutan berdasarkan kerangka kerja.
Baik format input dan output dari pekerjaan MapReduce dalam bentuk pasangan nilai-kunci -
(Masukan) <k1, v1> -> peta -> <k2, v2> -> kurangi -> <k3, v3> (Output).
Memasukkan | Keluaran | |
---|---|---|
Peta | <k1, v1> | daftar (<k2, v2>) |
Mengurangi | <k2, daftar (v2)> | daftar (<k3, v3>) |
Implementasi MapReduce
Tabel berikut menunjukkan data mengenai konsumsi listrik suatu organisasi. Tabel tersebut mencakup konsumsi listrik bulanan dan rata-rata tahunan selama lima tahun berturut-turut.
Jan | Feb | Merusak | Apr | Mungkin | Jun | Jul | Agustus | Sep | Okt | Nov | Des | Rata-rata | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Kita perlu menulis aplikasi untuk memproses data masukan dalam tabel yang diberikan untuk menemukan tahun penggunaan maksimum, tahun penggunaan minimum, dan seterusnya. Tugas ini mudah bagi programmer dengan jumlah record yang terbatas, karena mereka hanya akan menulis logika untuk menghasilkan keluaran yang diperlukan, dan meneruskan data ke aplikasi tertulis.
Sekarang mari kita naikkan skala data masukan. Asumsikan kita harus menganalisis konsumsi listrik dari semua industri skala besar di negara bagian tertentu. Saat kami menulis aplikasi untuk memproses data massal tersebut,
Mereka akan membutuhkan banyak waktu untuk dieksekusi.
Akan ada lalu lintas jaringan yang padat saat kita memindahkan data dari sumber ke server jaringan.
Untuk mengatasi masalah ini, kami memiliki kerangka kerja MapReduce.
Memasukan data
Data di atas disimpan sebagai sample.txtdan diberikan sebagai masukan. File input terlihat seperti yang ditunjukkan di bawah ini.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Contoh Program
Program berikut untuk data sampel menggunakan kerangka kerja MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Simpan program di atas ke dalam ProcessUnits.java. Kompilasi dan eksekusi program diberikan di bawah ini.
Kompilasi dan Eksekusi Program ProcessUnits
Mari kita asumsikan bahwa kita berada di direktori home dari pengguna Hadoop (misalnya / home / hadoop).
Ikuti langkah-langkah yang diberikan di bawah ini untuk mengkompilasi dan menjalankan program di atas.
Step 1 - Gunakan perintah berikut untuk membuat direktori untuk menyimpan kelas java yang dikompilasi.
$ mkdir units
Step 2- Unduh Hadoop-core-1.2.1.jar, yang digunakan untuk mengkompilasi dan menjalankan program MapReduce. Unduh jar dari mvnrepository.com . Mari kita asumsikan folder unduhan adalah / home / hadoop /.
Step 3 - Perintah berikut digunakan untuk mengkompilasi file ProcessUnits.java program dan membuat toples untuk program tersebut.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - Perintah berikut digunakan untuk membuat direktori input di HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Perintah berikut digunakan untuk menyalin file input bernama sample.txt di direktori input HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - Perintah berikut digunakan untuk memverifikasi file di direktori input
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Perintah berikut digunakan untuk menjalankan aplikasi Eleunit_max dengan mengambil file input dari direktori input.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Tunggu beberapa saat hingga file dieksekusi. Setelah dieksekusi, output berisi sejumlah input split, tugas Peta, tugas Peredam, dll.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - Perintah berikut digunakan untuk memverifikasi file yang dihasilkan di folder keluaran.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Perintah berikut digunakan untuk melihat keluaran dalam Part-00000mengajukan. File ini dibuat oleh HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Berikut adalah output yang dihasilkan oleh program MapReduce -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - Perintah berikut digunakan untuk menyalin folder output dari HDFS ke sistem file lokal.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop
Pemartisi bekerja seperti kondisi dalam memproses set data masukan. Fase partisi berlangsung setelah fase Peta dan sebelum fase Mengurangi.
Jumlah pemartisi sama dengan jumlah reduksi. Itu artinya seorang pemartisi akan membagi data sesuai dengan jumlah reduksi. Oleh karena itu, data yang dikirimkan dari satu pemartisi diproses oleh satu Peredam.
Partisi
Pemartisi mempartisi pasangan nilai kunci dari keluaran Peta menengah. Ini mempartisi data menggunakan kondisi yang ditentukan pengguna, yang berfungsi seperti fungsi hash. Jumlah total partisi sama dengan jumlah tugas Peredam untuk pekerjaan itu. Mari kita ambil contoh untuk memahami cara kerja pemartisi.
Implementasi MapReduce Partitioner
Demi kenyamanan, mari kita asumsikan kita memiliki tabel kecil bernama Karyawan dengan data berikut. Kami akan menggunakan data sampel ini sebagai kumpulan data masukan kami untuk mendemonstrasikan cara kerja pemartisi.
Indo | Nama | Usia | Jenis kelamin | Gaji |
---|---|---|---|---|
1201 | gopal | 45 | Pria | 50.000 |
1202 | manisha | 40 | Perempuan | 50.000 |
1203 | khalil | 34 | Pria | 30.000 |
1204 | prasanth | 30 | Pria | 30.000 |
1205 | kiran | 20 | Pria | 40.000 |
1206 | laxmi | 25 | Perempuan | 35.000 |
1207 | bhavya | 20 | Perempuan | 15.000 |
1208 | reshma | 19 | Perempuan | 15.000 |
1209 | kranthi | 22 | Pria | 22.000 |
1210 | Satish | 24 | Pria | 25.000 |
1211 | Krishna | 25 | Pria | 25.000 |
1212 | Arshad | 28 | Pria | 20.000 |
1213 | lavanya | 18 | Perempuan | 8.000 |
Kami harus menulis aplikasi untuk memproses kumpulan data masukan untuk menemukan karyawan dengan gaji tertinggi menurut jenis kelamin di berbagai kelompok usia (misalnya, di bawah 20, antara 21 hingga 30, di atas 30).
Memasukan data
Data di atas disimpan sebagai input.txt di direktori “/ home / hadoop / hadoopPartitioner” dan diberikan sebagai masukan.
1201 | gopal | 45 | Pria | 50000 |
1202 | manisha | 40 | Perempuan | 51000 |
1203 | khaleel | 34 | Pria | 30000 |
1204 | prasanth | 30 | Pria | 31000 |
1205 | kiran | 20 | Pria | 40000 |
1206 | laxmi | 25 | Perempuan | 35000 |
1207 | bhavya | 20 | Perempuan | 15000 |
1208 | reshma | 19 | Perempuan | 14000 |
1209 | kranthi | 22 | Pria | 22000 |
1210 | Satish | 24 | Pria | 25000 |
1211 | Krishna | 25 | Pria | 26000 |
1212 | Arshad | 28 | Pria | 20000 |
1213 | lavanya | 18 | Perempuan | 8000 |
Berdasarkan masukan yang diberikan, berikut penjelasan algoritmik program tersebut.
Tugas Peta
Tugas peta menerima pasangan nilai kunci sebagai input sementara kita memiliki data teks dalam file teks. Input untuk tugas peta ini adalah sebagai berikut -
Input - Kuncinya adalah pola seperti "kunci khusus + nama file + nomor baris" (contoh: key = @ input1) dan nilainya adalah data di baris itu (contoh: nilai = 1201 \ t gopal \ t 45 \ t Pria \ t 50000).
Method - Pengoperasian tugas peta ini adalah sebagai berikut -
Membaca value (record data), yang datang sebagai nilai input dari daftar argumen dalam sebuah string.
Menggunakan fungsi split, pisahkan jenis kelamin dan simpan dalam variabel string.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Kirim informasi gender dan data catatan value sebagai pasangan nilai kunci keluaran dari tugas peta ke partition task.
context.write(new Text(gender), new Text(value));
Ulangi semua langkah di atas untuk semua catatan di file teks.
Output - Anda akan mendapatkan data jenis kelamin dan nilai data catatan sebagai pasangan nilai kunci.
Tugas Partisi
Tugas pemartisi menerima pasangan nilai kunci dari tugas peta sebagai inputnya. Partisi berarti membagi data menjadi beberapa segmen. Menurut kriteria partisi bersyarat yang diberikan, data pasangan nilai kunci yang dimasukkan dapat dibagi menjadi tiga bagian berdasarkan kriteria usia.
Input - Seluruh data dalam kumpulan pasangan nilai-kunci.
key = Nilai bidang jenis kelamin dalam catatan.
nilai = Nilai data catatan utuh dari jenis kelamin itu.
Method - Proses logika partisi berjalan sebagai berikut.
- Baca nilai bidang usia dari input key-value pair.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Cek nilai umur dengan ketentuan sebagai berikut.
- Usia kurang dari atau sama dengan 20 tahun
- Usia Lebih dari 20 dan Kurang dari atau sama dengan 30.
- Usia Lebih dari 30.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Seluruh data pasangan nilai kunci tersegmentasi menjadi tiga kumpulan pasangan nilai kunci. Reducer bekerja secara individual pada setiap koleksi.
Kurangi Tugas
Jumlah tugas pemartisi sama dengan jumlah tugas peredam. Di sini kami memiliki tiga tugas pemartisi dan karenanya kami memiliki tiga tugas Reducer untuk dieksekusi.
Input - Reducer akan mengeksekusi tiga kali dengan koleksi key-value pair yang berbeda.
key = nilai bidang gender dalam catatan.
nilai = seluruh data catatan jenis kelamin itu.
Method - Logika berikut akan diterapkan pada setiap koleksi.
- Baca nilai bidang Gaji dari setiap catatan.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Periksa gaji dengan variabel maks. Jika str [4] adalah gaji maksimum, maka tetapkan str [4] ke max, jika tidak, lewati langkah tersebut.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Ulangi Langkah 1 dan 2 untuk setiap koleksi kunci (Pria & Wanita adalah koleksi kunci). Setelah melakukan tiga langkah ini, Anda akan menemukan satu gaji maksimal dari koleksi kunci Pria dan satu gaji maksimal dari koleksi kunci Wanita.
context.write(new Text(key), new IntWritable(max));
Output- Terakhir, Anda akan mendapatkan sekumpulan data pasangan nilai kunci dalam tiga koleksi kelompok usia yang berbeda. Ini berisi gaji maksimal dari koleksi Pria dan gaji maksimal dari koleksi Wanita di masing-masing kelompok umur.
Setelah menjalankan tugas Map, Partitioner, dan Reduce, tiga kumpulan data key-value pair disimpan dalam tiga file berbeda sebagai output.
Ketiga tugas tersebut diperlakukan sebagai pekerjaan MapReduce. Persyaratan dan spesifikasi berikut dari pekerjaan ini harus ditentukan dalam Konfigurasi -
- Nama Pekerjaan
- Format Input dan Output dari kunci dan nilai
- Kelas individu untuk tugas Map, Reduce, dan Partitioner
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Contoh Program
Program berikut menunjukkan bagaimana mengimplementasikan pemartisi untuk kriteria yang diberikan dalam program MapReduce.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Simpan kode di atas sebagai PartitionerExample.javadi “/ home / hadoop / hadoopPartitioner”. Kompilasi dan eksekusi program diberikan di bawah ini.
Kompilasi dan Eksekusi
Mari kita asumsikan bahwa kita berada di direktori home dari pengguna Hadoop (misalnya, / home / hadoop).
Ikuti langkah-langkah yang diberikan di bawah ini untuk mengkompilasi dan menjalankan program di atas.
Step 1- Unduh Hadoop-core-1.2.1.jar, yang digunakan untuk mengkompilasi dan menjalankan program MapReduce. Anda dapat mengunduh jar dari mvnrepository.com .
Mari kita asumsikan folder yang diunduh adalah "/ home / hadoop / hadoopPartitioner"
Step 2 - Perintah berikut digunakan untuk menyusun program PartitionerExample.java dan membuat toples untuk program tersebut.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .
Step 3 - Gunakan perintah berikut untuk membuat direktori input di HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Gunakan perintah berikut untuk menyalin file input bernama input.txt di direktori input HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Gunakan perintah berikut untuk memverifikasi file di direktori input.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Gunakan perintah berikut untuk menjalankan aplikasi Gaji teratas dengan mengambil file input dari direktori input.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Tunggu beberapa saat hingga file dieksekusi. Setelah dieksekusi, output berisi sejumlah input split, tugas peta, dan tugas Reducer.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Gunakan perintah berikut untuk memverifikasi file yang dihasilkan di folder keluaran.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Anda akan menemukan output dalam tiga file karena Anda menggunakan tiga pemartisi dan tiga Reducer dalam program Anda.
Step 8 - Gunakan perintah berikut untuk melihat keluarannya Part-00000mengajukan. File ini dibuat oleh HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Gunakan perintah berikut untuk melihat keluarannya Part-00001 mengajukan.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Gunakan perintah berikut untuk melihat keluarannya Part-00002 mengajukan.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000
A Combiner, juga dikenal sebagai a semi-reducer, adalah kelas opsional yang beroperasi dengan menerima masukan dari kelas Peta dan kemudian meneruskan pasangan nilai kunci keluaran ke kelas Peredam.
Fungsi utama dari sebuah Combiner adalah untuk meringkas catatan keluaran peta dengan kunci yang sama. Output (kumpulan nilai-kunci) dari penggabung akan dikirim melalui jaringan ke tugas Reducer aktual sebagai input.
Penggabung
Kelas Combiner digunakan di antara kelas Map dan kelas Reduce untuk mengurangi volume transfer data antara Map dan Reduce. Biasanya, output dari tugas peta besar dan data yang ditransfer ke tugas pengurangan tinggi.
Diagram tugas MapReduce berikut memperlihatkan FASE COMBINER.
Bagaimana Combiner Bekerja?
Berikut adalah ringkasan singkat tentang cara kerja MapReduce Combiner -
Penggabung tidak memiliki antarmuka yang telah ditentukan dan harus mengimplementasikan metode reduce () antarmuka Reducer.
Penggabung beroperasi pada setiap kunci keluaran peta. Ini harus memiliki jenis nilai kunci keluaran yang sama dengan kelas Reducer.
Penggabung dapat menghasilkan informasi ringkasan dari kumpulan data besar karena menggantikan keluaran Peta asli.
Meskipun, Combiner bersifat opsional namun membantu memisahkan data menjadi beberapa grup untuk fase Reduce, yang membuatnya lebih mudah untuk diproses.
Implementasi MapReduce Combiner
Contoh berikut memberikan ide teoritis tentang penggabung. Mari kita asumsikan kita memiliki file teks input berikut bernamainput.txt untuk MapReduce.
What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance
Fase penting dari program MapReduce dengan Combiner dibahas di bawah ini.
Rekam Pembaca
Ini adalah fase pertama MapReduce di mana Pembaca Rekaman membaca setiap baris dari file teks masukan sebagai teks dan menghasilkan keluaran sebagai pasangan nilai kunci.
Input - Teks baris demi baris dari file input.
Output- Membentuk pasangan nilai kunci. Berikut ini adalah kumpulan pasangan kunci-nilai yang diharapkan.
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Fase Peta
Fase Map mengambil masukan dari Pembaca Rekaman, memprosesnya, dan menghasilkan keluaran sebagai kumpulan pasangan nilai kunci lainnya.
Input - Pasangan nilai kunci berikut adalah input yang diambil dari Pembaca Rekaman.
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Fase Map membaca setiap pasangan nilai kunci, membagi setiap kata dari nilai menggunakan StringTokenizer, memperlakukan setiap kata sebagai kunci dan jumlah kata tersebut sebagai nilai. Potongan kode berikut menunjukkan kelas Mapper dan fungsi peta.
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Output - Output yang diharapkan adalah sebagai berikut -
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
Fase Penggabung
Fase Penggabung mengambil setiap pasangan nilai kunci dari fase Peta, memprosesnya, dan menghasilkan keluaran sebagai key-value collection pasangan.
Input - Pasangan nilai kunci berikut adalah input yang diambil dari fase Peta.
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
Fase Penggabung membaca setiap pasangan nilai kunci, menggabungkan kata-kata umum sebagai kunci dan nilai sebagai kumpulan. Biasanya, kode dan operasi untuk Combiner mirip dengan Reducer. Berikut adalah potongan kode untuk deklarasi kelas Mapper, Combiner dan Reducer.
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
Output - Output yang diharapkan adalah sebagai berikut -
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Fase Peredam
Fase Reducer mengambil setiap pasangan kumpulan nilai kunci dari fase Penggabung, memprosesnya, dan meneruskan keluaran sebagai pasangan nilai kunci. Perhatikan bahwa fungsionalitas Penggabung sama dengan Peredam.
Input - Pasangan nilai kunci berikut adalah input yang diambil dari fase Penggabung.
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Fase Reducer membaca setiap pasangan nilai kunci. Berikut adalah potongan kode untuk Combiner.
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Output - Output yang diharapkan dari fase Reducer adalah sebagai berikut -
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Penulis Rekam
Ini adalah fase terakhir MapReduce di mana Record Writer menulis setiap pasangan nilai kunci dari fase Reducer dan mengirimkan output sebagai teks.
Input - Setiap pasangan nilai kunci dari fase Reducer bersama dengan format Output.
Output- Ini memberi Anda pasangan nilai kunci dalam format teks. Berikut adalah keluaran yang diharapkan.
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1
Contoh Program
Blok kode berikut menghitung jumlah kata dalam sebuah program.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Simpan program di atas sebagai WordCount.java. Kompilasi dan eksekusi program diberikan di bawah ini.
Kompilasi dan Eksekusi
Mari kita asumsikan bahwa kita berada di direktori home pengguna Hadoop (misalnya, / home / hadoop).
Ikuti langkah-langkah yang diberikan di bawah ini untuk mengkompilasi dan menjalankan program di atas.
Step 1 - Gunakan perintah berikut untuk membuat direktori untuk menyimpan kelas java yang dikompilasi.
$ mkdir units
Step 2- Unduh Hadoop-core-1.2.1.jar, yang digunakan untuk mengkompilasi dan menjalankan program MapReduce. Anda dapat mengunduh jar dari mvnrepository.com .
Mari kita asumsikan folder yang diunduh adalah / home / hadoop /.
Step 3 - Gunakan perintah berikut untuk mengkompilasi file WordCount.java program dan membuat toples untuk program tersebut.
$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .
Step 4 - Gunakan perintah berikut untuk membuat direktori input di HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Gunakan perintah berikut untuk menyalin file input bernama input.txt di direktori input HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir
Step 6 - Gunakan perintah berikut untuk memverifikasi file di direktori input.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Gunakan perintah berikut untuk menjalankan aplikasi hitung kata dengan mengambil file input dari direktori input.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Tunggu beberapa saat hingga file dieksekusi. Setelah dieksekusi, output berisi sejumlah input split, tugas Peta, dan tugas Peredam.
Step 8 - Gunakan perintah berikut untuk memverifikasi file yang dihasilkan di folder keluaran.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Gunakan perintah berikut untuk melihat keluarannya Part-00000mengajukan. File ini dibuat oleh HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Berikut adalah keluaran yang dihasilkan oleh program MapReduce.
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1
Bab ini menjelaskan administrasi Hadoop yang mencakup administrasi HDFS dan MapReduce.
Administrasi HDFS mencakup pemantauan struktur file HDFS, lokasi, dan file yang diperbarui.
Administrasi MapReduce termasuk memantau daftar aplikasi, konfigurasi node, status aplikasi, dll.
Pemantauan HDFS
HDFS (Hadoop Distributed File System) berisi direktori pengguna, file input, dan file output. Gunakan perintah MapReduce,put dan get, untuk menyimpan dan mengambil.
Setelah memulai kerangka kerja Hadoop (daemon) dengan meneruskan perintah "start-all.sh" pada "/ $ HADOOP_HOME / sbin", berikan URL berikut ke browser "http: // localhost: 50070". Anda harus melihat layar berikut di browser Anda.
Tangkapan layar berikut menunjukkan cara menelusuri HDFS.
Tangkapan layar berikut menunjukkan struktur file HDFS. Ini menunjukkan file di direktori "/ user / hadoop".
Tangkapan layar berikut menunjukkan informasi Datanode di cluster. Di sini Anda dapat menemukan satu node dengan konfigurasi dan kapasitasnya.
MapReduce Job Monitoring
Aplikasi MapReduce adalah kumpulan pekerjaan (pekerjaan Peta, Penggabung, Pemartisi, dan Kurangi pekerjaan). Adalah wajib untuk memantau dan memelihara hal-hal berikut -
- Konfigurasi datanode dimana aplikasi cocok.
- Jumlah datanoda dan resource yang digunakan per aplikasi.
Untuk memantau semua hal ini, kita harus memiliki antarmuka pengguna. Setelah memulai kerangka kerja Hadoop dengan meneruskan perintah "start-all.sh" pada "/ $ HADOOP_HOME / sbin", berikan URL berikut ke browser "http: // localhost: 8080". Anda harus melihat layar berikut di browser Anda.
Pada gambar di atas, penunjuk tangan ada di ID aplikasi. Cukup klik di atasnya untuk menemukan layar berikut di browser Anda. Ini menjelaskan hal berikut -
Pada pengguna mana aplikasi saat ini sedang berjalan
Nama aplikasi
Jenis aplikasi itu
Status saat ini, status akhir
Waktu mulai aplikasi, berlalu (waktu selesai), jika selesai pada saat pemantauan
Sejarah aplikasi ini, yaitu informasi log
Dan terakhir, informasi node, yaitu node yang berpartisipasi dalam menjalankan aplikasi.
Tangkapan layar berikut menunjukkan detail aplikasi tertentu -
Tangkapan layar berikut menjelaskan informasi node yang sedang berjalan. Di sini, tangkapan layar hanya berisi satu node. Sebuah penunjuk tangan menunjukkan alamat localhost dari node yang sedang berjalan.