MapReduce - विभाजनकर्ता
एक विभाजनक एक इनपुट डेटासेट के प्रसंस्करण में एक शर्त की तरह काम करता है। विभाजन चरण मैप चरण के बाद और चरण के चरण से पहले होता है।
विभाजनकर्ताओं की संख्या Reducers की संख्या के बराबर है। इसका मतलब है कि एक विभाजनक डेटा को रीड्यूसर की संख्या के अनुसार विभाजित करेगा। इसलिए, एक एकल पार्टीशनर से पारित डेटा एकल Reducer द्वारा संसाधित किया जाता है।
विभाजनर
एक विभाजक मध्यवर्ती मानचित्र-आउटपुट के प्रमुख-मूल्य जोड़े को विभाजित करता है। यह उपयोगकर्ता-परिभाषित स्थिति का उपयोग करके डेटा का विभाजन करता है, जो हैश फ़ंक्शन की तरह काम करता है। विभाजन की कुल संख्या नौकरी के लिए Reducer कार्यों की संख्या के समान है। हमें समझने के लिए एक उदाहरण लेते हैं कि विभाजनकर्ता कैसे काम करता है।
MapReduce Partitioner कार्यान्वयन
सुविधा के लिए, मान लें कि हमारे पास निम्न डेटा के साथ कर्मचारी नामक एक छोटी तालिका है। हम इस नमूना डेटा का उपयोग हमारे इनपुट डेटासेट के रूप में करेंगे कि यह प्रदर्शित करने के लिए कि विभाजन कैसे काम करता है।
ईद | नाम | उम्र | लिंग | वेतन |
---|---|---|---|---|
1201 | गोपाल | 45 | पुरुष | 50,000 |
1202 | मनीषा | 40 | महिला | 50,000 |
1203 | खलील | 34 | पुरुष | 30,000 |
1204 | प्रशांत | 30 | पुरुष | 30,000 |
1205 | किरण | 20 | पुरुष | 40,000 |
1206 | लक्ष्मी | 25 | महिला | 35,000 |
1207 | Bhavya | 20 | महिला | 15,000 |
1208 | रेशमा | 19 | महिला | 15,000 |
1209 | क्रांति | 22 | पुरुष | 22,000 |
1210 | सतीश | 24 | पुरुष | 25,000 |
1211 | कृष्णा | 25 | पुरुष | 25,000 |
1212 | अरशद | 28 | पुरुष | 20,000 |
1213 | लावण्या | 18 | महिला | 8000 |
हमें विभिन्न आयु समूहों (उदाहरण के लिए, 20 से नीचे, 21 से 30 के बीच, 30 से ऊपर) में लिंग द्वारा उच्चतम वेतनभोगी कर्मचारी को खोजने के लिए इनपुट डेटासेट को संसाधित करने के लिए एक आवेदन लिखना होगा।
इनपुट डेटा
उपरोक्त डेटा के रूप में सहेजा गया है input.txt "/ home / hadoop / hadoopPartitioner" निर्देशिका में और इनपुट के रूप में दिया गया है।
1201 | गोपाल | 45 | पुरुष | 50000 |
1202 | मनीषा | 40 | महिला | 51000 |
1203 | खलील | 34 | पुरुष | 30000 |
1204 | प्रशांत | 30 | पुरुष | 31000 |
1205 | किरण | 20 | पुरुष | 40000 |
1206 | लक्ष्मी | 25 | महिला | 35000 |
1207 | Bhavya | 20 | महिला | 15000 |
1208 | रेशमा | 19 | महिला | 14000 |
1209 | क्रांति | 22 | पुरुष | 22000 |
1210 | सतीश | 24 | पुरुष | 25000 |
1211 | कृष्णा | 25 | पुरुष | 26000 |
1212 | अरशद | 28 | पुरुष | 20000 |
1213 | लावण्या | 18 | महिला | 8000 |
दिए गए इनपुट के आधार पर, कार्यक्रम की एल्गोरिथम व्याख्या निम्नलिखित है।
मानचित्र कार्य
मैप टास्क इनपुट के रूप में की-वैल्यू पेयर को स्वीकार करता है जबकि हमारे पास टेक्स्ट फाइल में टेक्स्ट डेटा होता है। इस मानचित्र कार्य के लिए इनपुट निम्नानुसार है -
Input - कुंजी एक पैटर्न होगा जैसे "कोई विशेष कुंजी + फ़ाइल नाम + लाइन नंबर" (उदाहरण: कुंजी = @ input1) और मान उस रेखा में डेटा होगा (उदाहरण: मान = 1201 \ t गोपाल \ t 45 \ t पुरुष \ t 50000)।
Method - इस मानचित्र कार्य का संचालन निम्नानुसार है -
को पढ़िए value (रिकॉर्ड डेटा), जो एक स्ट्रिंग में तर्क सूची से इनपुट मान के रूप में आता है।
स्प्लिट फ़ंक्शन का उपयोग करके, लिंग और स्टोर को एक स्ट्रिंग चर में अलग करें।
String[] str = value.toString().split("\t", -3);
String gender=str[3];
लिंग की जानकारी और रिकॉर्ड डेटा भेजें value मानचित्र कार्य से आउटपुट की-वैल्यू पेयर के रूप में partition task।
context.write(new Text(gender), new Text(value));
पाठ फ़ाइल के सभी रिकॉर्ड के लिए उपरोक्त सभी चरणों को दोहराएं।
Output - आपको लिंग डेटा और की-वैल्यू जोड़े के रूप में रिकॉर्ड डेटा मूल्य मिलेगा।
विभाजनकर्ता कार्य
विभाजनकर्ता कार्य अपने इनपुट के रूप में मानचित्र कार्य से मुख्य-मूल्य जोड़े को स्वीकार करता है। विभाजन का तात्पर्य डेटा को खंडों में विभाजित करना है। विभाजन के दिए गए सशर्त मानदंडों के अनुसार, इनपुट कुंजी-मूल्य युग्मित डेटा को आयु मानदंडों के आधार पर तीन भागों में विभाजित किया जा सकता है।
Input - कुंजी-मूल्य जोड़े के संग्रह में पूरा डेटा।
की = रिकॉर्ड में जेंडर फील्ड वैल्यू।
मूल्य = पूरे लिंग का रिकॉर्ड डेटा मूल्य।
Method - विभाजन तर्क की प्रक्रिया निम्नानुसार चलती है।
- इनपुट की-वैल्यू जोड़ी से आयु क्षेत्र मान पढ़ें।
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
निम्नलिखित स्थितियों के साथ आयु मान की जाँच करें।
- 20 से कम या बराबर उम्र
- आयु २० से अधिक और ३० के बराबर या उससे कम।
- आयु 30 से अधिक।
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- कुंजी-मूल्य जोड़े के पूरे डेटा को कुंजी-मूल्य जोड़े के तीन संग्रह में विभाजित किया गया है। Reducer प्रत्येक संग्रह पर व्यक्तिगत रूप से काम करता है।
कार्य कम करें
विभाजनकर्ता कार्यों की संख्या Reducer कार्यों की संख्या के बराबर है। यहां हमारे पास तीन विभाजन कार्य हैं और इसलिए हमारे पास निष्पादन के लिए तीन Reducer कार्य हैं।
Input - Reducer तीन बार कुंजी-मूल्य जोड़े के विभिन्न संग्रह के साथ निष्पादित करेगा।
रिकॉर्ड में कुंजी = लिंग क्षेत्र मान।
मूल्य = उस लिंग का पूरा रिकॉर्ड डेटा।
Method - प्रत्येक संग्रह पर निम्नलिखित तर्क लागू किए जाएंगे।
- प्रत्येक रिकॉर्ड का वेतन क्षेत्र मान पढ़ें।
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
अधिकतम चर के साथ वेतन की जांच करें। यदि str [4] अधिकतम वेतन है, तो str [4] को अधिकतम असाइन करें, अन्यथा चरण छोड़ें।
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
प्रत्येक प्रमुख संग्रह के लिए चरण 1 और 2 दोहराएं (पुरुष और महिला प्रमुख संग्रह हैं)। इन तीन चरणों को निष्पादित करने के बाद, आपको पुरुष कुंजी संग्रह से एक अधिकतम वेतन और महिला कुंजी संग्रह से एक अधिकतम वेतन मिलेगा।
context.write(new Text(key), new IntWritable(max));
Output- अंत में, आपको विभिन्न आयु समूहों के तीन संग्रह में कुंजी-मूल्य जोड़ी डेटा का एक सेट मिलेगा। इसमें पुरुष संग्रह से अधिकतम वेतन और प्रत्येक आयु वर्ग में महिला संग्रह से अधिकतम वेतन क्रमशः है।
मैप, पार्टिशनर और रिड्यूस कार्यों को अंजाम देने के बाद, कुंजी-मूल्य जोड़ी डेटा के तीन संग्रह आउटपुट के रूप में तीन अलग-अलग फाइलों में संग्रहीत किए जाते हैं।
सभी तीन कार्यों को मैपरेडेस नौकरियों के रूप में माना जाता है। इन नौकरियों की निम्नलिखित आवश्यकताओं और विनिर्देशों को विन्यास में निर्दिष्ट किया जाना चाहिए -
- कार्य नाम
- कुंजी और मूल्यों के इनपुट और आउटपुट प्रारूप
- मानचित्र, कमी और विभाजन कार्यों के लिए अलग-अलग कक्षाएं
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
उदाहरण कार्यक्रम
निम्न प्रोग्राम दिखाता है कि MapReduce प्रोग्राम में दिए गए मानदंडों के लिए पार्टीशनर्स को कैसे लागू किया जाए।
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
उपरोक्त कोड को इस प्रकार सेव करें PartitionerExample.java"/ घर / हैडूप / हडूपार्टिशनर" में। कार्यक्रम का संकलन और निष्पादन नीचे दिया गया है।
संकलन और निष्पादन
आइए हम मान लें कि हम Hadoop उपयोगकर्ता की होम निर्देशिका में हैं (उदाहरण के लिए, / home / hadoop)।
उपरोक्त कार्यक्रम को संकलित करने और निष्पादित करने के लिए नीचे दिए गए चरणों का पालन करें।
Step 1- Hadoop-core-1.2.1.jar डाउनलोड करें, जो MapReduce प्रोग्राम को संकलित करने और निष्पादित करने के लिए उपयोग किया जाता है। आप jvn को mvnrepository.com से डाउनलोड कर सकते हैं ।
मान लें कि डाउनलोड किया गया फोल्डर “/ home / hadoop / hadoopPartitioner” है
Step 2 - प्रोग्राम को कंपाइल करने के लिए निम्न कमांड का उपयोग किया जाता है PartitionerExample.java और कार्यक्रम के लिए एक जार बनाना।
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - एचडीएफएस में इनपुट डायरेक्टरी बनाने के लिए निम्न कमांड का उपयोग करें।
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - नामित इनपुट फ़ाइल को कॉपी करने के लिए निम्न कमांड का उपयोग करें input.txt एचडीएफएस की इनपुट डायरेक्टरी में।
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - इनपुट डायरेक्टरी में फाइलों को सत्यापित करने के लिए निम्न कमांड का उपयोग करें।
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - इनपुट डायरेक्टरी से इनपुट फाइल लेकर टॉप सैलरी एप्लिकेशन को चलाने के लिए निम्न कमांड का उपयोग करें।
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
फ़ाइल निष्पादित होने तक कुछ समय तक प्रतीक्षा करें। निष्पादन के बाद, आउटपुट में कई इनपुट विभाजन, मानचित्र कार्य और Reducer कार्य शामिल हैं।
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - आउटपुट फ़ोल्डर में परिणामी फाइलों को सत्यापित करने के लिए निम्न कमांड का उपयोग करें।
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
आपको तीन फ़ाइलों में आउटपुट मिलेगा क्योंकि आप अपने प्रोग्राम में तीन पार्टीटर्स और तीन रेड्यूसर का उपयोग कर रहे हैं।
Step 8 - आउटपुट को देखने के लिए निम्न कमांड का उपयोग करें Part-00000फ़ाइल। यह फाइल HDFS द्वारा जनरेट की गई है।
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
आउटपुट को देखने के लिए निम्न कमांड का उपयोग करें Part-00001 फ़ाइल।
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
आउटपुट को देखने के लिए निम्न कमांड का उपयोग करें Part-00002 फ़ाइल।
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000