MapReduce - Hadoop Uygulaması
MapReduce, büyük hacimli verileri geniş ticari donanım kümelerinde güvenilir bir şekilde işlemek için uygulamalar yazmak için kullanılan bir çerçevedir. Bu bölüm, Java kullanarak Hadoop çerçevesinde MapReduce'un çalıştırılmasına götürür.
MapReduce Algoritması
Genel olarak MapReduce paradigması, gerçek verilerin bulunduğu bilgisayarlara harita azaltma programları göndermeye dayanır.
Bir MapReduce işi sırasında Hadoop, Eşleme ve Azaltma görevlerini kümedeki uygun sunuculara gönderir.
Çerçeve, görevler yayınlama, görev tamamlamayı doğrulama ve düğümler arasındaki küme etrafındaki verileri kopyalama gibi veri aktarımının tüm ayrıntılarını yönetir.
Hesaplamanın çoğu, ağ trafiğini azaltan yerel disklerdeki verilerle düğümlerde gerçekleşir.
Belirli bir görevi tamamladıktan sonra, küme verileri toplar ve uygun bir sonuç oluşturmak için azaltır ve Hadoop sunucusuna geri gönderir.
Girdiler ve Çıktılar (Java Perspective)
MapReduce çerçevesi anahtar-değer çiftleri üzerinde çalışır, yani çerçeve işin girdisini bir anahtar-değer çifti kümesi olarak görür ve işin çıktısı olarak, muhtemelen farklı tiplerde bir dizi anahtar-değer çifti üretir.
Anahtar ve değer sınıflarının çerçeve tarafından serileştirilebilir olması gerekir ve bu nedenle Yazılabilir arabirimi uygulamak gerekir. Ek olarak, anahtar sınıfların çerçeveye göre sıralamayı kolaylaştırmak için WritableComparable arabirimini uygulaması gerekir.
MapReduce işinin hem girdi hem de çıktı biçimi anahtar-değer çiftleri biçimindedir -
(Giriş) <k1, v1> -> harita -> <k2, v2> -> azalt -> <k3, v3> (Çıkış).
Giriş | Çıktı | |
---|---|---|
Harita | <k1, v1> | liste (<k2, v2>) |
Azalt | <k2, liste (v2)> | liste (<k3, v3>) |
MapReduce Uygulaması
Aşağıdaki tablo, bir kuruluşun elektrik tüketimi ile ilgili verileri göstermektedir. Tablo, aylık elektrik tüketimini ve arka arkaya beş yıl için yıllık ortalamayı içerir.
Oca | Şubat | Mar | Nis | Mayıs | Haz | Tem | Ağu | Eylül | Ekim | Kasım | Aralık | Ort. | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Maksimum kullanım yılını, minimum kullanım yılını vb. Bulmak için verilen tablodaki giriş verilerini işlemek için uygulamalar yazmamız gerekir. Bu görev, gerekli çıktıyı üretmek için mantığı yazacakları ve verileri yazılı uygulamaya geçirecekleri için, sınırlı sayıda kaydı olan programcılar için kolaydır.
Şimdi giriş verilerinin ölçeğini artıralım. Belirli bir eyaletteki tüm büyük ölçekli endüstrilerin elektrik tüketimini analiz etmemiz gerektiğini varsayalım. Bu tür toplu verileri işlemek için uygulamalar yazdığımızda,
Yürütmek çok zaman alacak.
Verileri kaynaktan ağ sunucusuna taşıdığımızda yoğun ağ trafiği olacaktır.
Bu sorunları çözmek için MapReduce çerçevesine sahibiz.
Giriş Verileri
Yukarıdaki veriler şu şekilde kaydedilir: sample.txtve girdi olarak verilir. Girdi dosyası aşağıda gösterildiği gibi görünür.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Örnek Program
Örnek veriler için aşağıdaki program MapReduce çerçevesini kullanır.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Yukarıdaki programı şuraya kaydedin: ProcessUnits.java. Programın derlenmesi ve çalıştırılması aşağıda verilmiştir.
ProcessUnits Programının Derlenmesi ve Yürütülmesi
Hadoop kullanıcısının ana dizininde olduğumuzu varsayalım (örneğin / home / hadoop).
Yukarıdaki programı derlemek ve çalıştırmak için aşağıda verilen adımları izleyin.
Step 1 - Derlenmiş java sınıflarını depolamak için bir dizin oluşturmak için aşağıdaki komutu kullanın.
$ mkdir units
Step 2- MapReduce programını derlemek ve yürütmek için kullanılan Hadoop-core-1.2.1.jar dosyasını indirin. Kavanozu mvnrepository.com adresinden indirin . İndirme klasörünün / home / hadoop / olduğunu varsayalım.
Step 3 - Aşağıdaki komutlar, ProcessUnits.java program ve program için bir jar oluşturun.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - Aşağıdaki komut, HDFS'de bir giriş dizini oluşturmak için kullanılır.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Aşağıdaki komut, adlı giriş dosyasını kopyalamak için kullanılır sample.txt HDFS'nin giriş dizininde.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - Giriş dizinindeki dosyaları doğrulamak için aşağıdaki komut kullanılır
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Aşağıdaki komut, girdi dosyalarını girdi dizininden alarak Eleunit_max uygulamasını çalıştırmak için kullanılır.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Dosya yürütülene kadar bir süre bekleyin. Yürütmeden sonra, çıktı bir dizi giriş bölmesi, Harita görevleri, Azaltıcı görevleri vb. İçerir.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - Aşağıdaki komut, çıktı klasöründe ortaya çıkan dosyaları doğrulamak için kullanılır.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Aşağıdaki komut, çıktıyı görmek için kullanılır. Part-00000dosya. Bu dosya HDFS tarafından oluşturulmuştur.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
MapReduce programı tarafından üretilen çıktı aşağıdadır -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - Aşağıdaki komut, çıktı klasörünü HDFS'den yerel dosya sistemine kopyalamak için kullanılır.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop