MapReduce - Hadoop कार्यान्वयन
MapReduce एक ऐसा ढांचा है, जो विश्वसनीय तरीके से कमोडिटी हार्डवेयर के बड़े समूहों पर डेटा की बड़ी मात्रा को संसाधित करने के लिए एप्लिकेशन लिखने के लिए उपयोग किया जाता है। यह अध्याय जावा का उपयोग करके Hadoop ढांचे में MapReduce के संचालन के माध्यम से ले जाता है।
MapReduce एल्गोरिथम
आमतौर पर MapReduce paradigm उन मानचित्रों को कम करने वाले कार्यक्रमों को कंप्यूटर पर भेजने पर आधारित होता है जहाँ वास्तविक डेटा रहता है।
MapReduce जॉब के दौरान, Hadoop मैप्स को भेजता है और क्लस्टर में उपयुक्त सर्वर को टास्क कम करता है।
फ़्रेम डेटा-पासिंग के सभी विवरणों को प्रबंधित करता है जैसे कि कार्य जारी करना, कार्य पूरा करना, और नोड्स के बीच क्लस्टर के आसपास डेटा की प्रतिलिपि बनाना।
अधिकांश कंप्यूटिंग नेटवर्क ट्रैफ़िक को कम करने वाले स्थानीय डिस्क पर डेटा के साथ नोड्स पर होता है।
दिए गए कार्य को पूरा करने के बाद, क्लस्टर एक उचित परिणाम बनाने के लिए डेटा एकत्र करता है और कम करता है, और इसे Hadoop सर्वर पर वापस भेजता है।
इनपुट्स और आउटपुट (जावा परिप्रेक्ष्य)
MapReduce ढांचा कुंजी-मूल्य जोड़े पर संचालित होता है, अर्थात, फ्रेमवर्क नौकरी के इनपुट को कुंजी-मूल्य जोड़े के एक सेट के रूप में देखता है और विभिन्न प्रकारों के बोधगम्य रूप से नौकरी के आउटपुट के रूप में कुंजी-मूल्य जोड़ी का एक सेट तैयार करता है।
कुंजी और मूल्य वर्गों को फ्रेमवर्क द्वारा क्रमबद्ध किया जाना है और इसलिए, इसे लिखने योग्य इंटरफ़ेस को लागू करना आवश्यक है। इसके अतिरिक्त, प्रमुख वर्गों को WritableComparable इंटरफ़ेस को लागू करना है ताकि फ्रेमवर्क द्वारा छँटाई की सुविधा मिल सके।
MapReduce जॉब का इनपुट और आउटपुट फॉर्मेट दोनों ही मुख्य-मूल्य जोड़े के रूप में हैं -
(इनपुट) <k1, v1> -> नक्शा -> <k2, v2> -> कम -> <k3, v3> (आउटपुट)।
इनपुट | उत्पादन | |
---|---|---|
नक्शा | <k1, v1> | सूची (<k2, v2>) |
कम करना | <k2, सूची (v2)> | सूची (<k3, v3>) |
MapReduce कार्यान्वयन
निम्न तालिका एक संगठन की बिजली की खपत के बारे में डेटा दिखाती है। तालिका में मासिक विद्युत खपत और लगातार पांच वर्षों तक वार्षिक औसत शामिल है।
जनवरी | फ़रवरी | मार्च | अप्रैल | मई | जून | जुलाई | अगस्त | सितम्बर | अक्टूबर | नवम्बर | दिसम्बर | औसत | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
हमें अधिकतम उपयोग का वर्ष, न्यूनतम उपयोग का वर्ष, और इसी तरह के अन्य कार्यों को खोजने के लिए दिए गए तालिका में इनपुट डेटा को संसाधित करने के लिए एप्लिकेशन लिखना होगा। यह कार्य प्रोग्रामर के लिए अभिलेखों की परिमित राशि के लिए आसान है, क्योंकि वे केवल आवश्यक उत्पादन करने के लिए तर्क लिखेंगे, और लिखित एप्लिकेशन को डेटा पास करेंगे।
चलिए अब इनपुट डेटा के पैमाने को बढ़ाते हैं। मान लें कि हमें किसी विशेष राज्य के सभी बड़े पैमाने के उद्योगों की विद्युत खपत का विश्लेषण करना है। जब हम ऐसे बल्क डेटा को प्रोसेस करने के लिए एप्लिकेशन लिखते हैं,
उन्हें निष्पादित करने में बहुत समय लगेगा।
जब हम स्रोत से नेटवर्क सर्वर पर डेटा स्थानांतरित करेंगे तो भारी नेटवर्क ट्रैफ़िक होगा।
इन समस्याओं को हल करने के लिए, हमारे पास MapReduce ढांचा है।
इनपुट डेटा
उपरोक्त डेटा के रूप में सहेजा गया है sample.txtऔर इनपुट के रूप में दिया गया। इनपुट फ़ाइल नीचे दिखाए गए अनुसार दिखाई देती है।
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
उदाहरण कार्यक्रम
नमूना डेटा के लिए निम्न प्रोग्राम MapReduce ढांचे का उपयोग करता है।
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
उपरोक्त कार्यक्रम में सहेजें ProcessUnits.java। कार्यक्रम का संकलन और निष्पादन नीचे दिया गया है।
ProcessUnits प्रोग्राम का संकलन और निष्पादन
आइए हम मान लें कि हम Hadoop उपयोगकर्ता (जैसे / घर / हैडूप) की होम डायरेक्टरी में हैं।
उपरोक्त कार्यक्रम को संकलित करने और निष्पादित करने के लिए नीचे दिए गए चरणों का पालन करें।
Step 1 - संकलित जावा कक्षाओं को संग्रहीत करने के लिए निर्देशिका बनाने के लिए निम्न कमांड का उपयोग करें।
$ mkdir units
Step 2- Hadoop-core-1.2.1.jar डाउनलोड करें, जो MapReduce प्रोग्राम को संकलित करने और निष्पादित करने के लिए उपयोग किया जाता है। Jvn को mvnrepository.com से डाउनलोड करें । हमें लगता है कि डाउनलोड फ़ोल्डर / घर / हैडूप / है।
Step 3 - निम्नलिखित आदेशों को संकलित करने के लिए उपयोग किया जाता है ProcessUnits.java कार्यक्रम और कार्यक्रम के लिए एक जार बनाने के लिए।
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - एचडीएफएस में इनपुट डायरेक्टरी बनाने के लिए निम्न कमांड का उपयोग किया जाता है।
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - निम्नलिखित कमांड का उपयोग नामित इनपुट फ़ाइल को कॉपी करने के लिए किया जाता है sample.txt एचडीएफएस की इनपुट डायरेक्टरी में।
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - इनपुट डायरेक्टरी में फाइलों को सत्यापित करने के लिए निम्न कमांड का उपयोग किया जाता है
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - इनपुट डायरेक्टरी से इनपुट फाइल्स लेकर Eleunit_max एप्लिकेशन को चलाने के लिए निम्न कमांड का उपयोग किया जाता है।
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
फ़ाइल निष्पादित होने तक कुछ समय तक प्रतीक्षा करें। निष्पादन के बाद, आउटपुट में कई इनपुट विभाजन, मानचित्र कार्य, Reducer कार्य आदि होते हैं।
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - आउटपुट फोल्डर में परिणामी फाइलों को सत्यापित करने के लिए निम्न कमांड का उपयोग किया जाता है।
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - आउटपुट को देखने के लिए निम्न कमांड का उपयोग किया जाता है Part-00000फ़ाइल। यह फाइल HDFS द्वारा जनरेट की गई है।
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
निम्नलिखित MapReduce कार्यक्रम द्वारा उत्पन्न उत्पादन है -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - एचडीएफएस से स्थानीय फ़ाइल सिस्टम में आउटपुट फ़ोल्डर की प्रतिलिपि बनाने के लिए निम्न कमांड का उपयोग किया जाता है।
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop