MapReduce - विभाजनकर्ता

एक विभाजनक एक इनपुट डेटासेट के प्रसंस्करण में एक शर्त की तरह काम करता है। विभाजन चरण मैप चरण के बाद और चरण के चरण से पहले होता है।

विभाजनकर्ताओं की संख्या Reducers की संख्या के बराबर है। इसका मतलब है कि एक विभाजनक डेटा को रीड्यूसर की संख्या के अनुसार विभाजित करेगा। इसलिए, एक एकल पार्टीशनर से पारित डेटा एकल Reducer द्वारा संसाधित किया जाता है।

विभाजनर

एक विभाजक मध्यवर्ती मानचित्र-आउटपुट के प्रमुख-मूल्य जोड़े को विभाजित करता है। यह उपयोगकर्ता-परिभाषित स्थिति का उपयोग करके डेटा का विभाजन करता है, जो हैश फ़ंक्शन की तरह काम करता है। विभाजन की कुल संख्या नौकरी के लिए Reducer कार्यों की संख्या के समान है। हमें समझने के लिए एक उदाहरण लेते हैं कि विभाजनकर्ता कैसे काम करता है।

MapReduce Partitioner कार्यान्वयन

सुविधा के लिए, मान लें कि हमारे पास निम्न डेटा के साथ कर्मचारी नामक एक छोटी तालिका है। हम इस नमूना डेटा का उपयोग हमारे इनपुट डेटासेट के रूप में करेंगे कि यह प्रदर्शित करने के लिए कि विभाजन कैसे काम करता है।

ईद नाम उम्र लिंग वेतन
1201 गोपाल 45 पुरुष 50,000
1202 मनीषा 40 महिला 50,000
1203 खलील 34 पुरुष 30,000
1204 प्रशांत 30 पुरुष 30,000
1205 किरण 20 पुरुष 40,000
1206 लक्ष्मी 25 महिला 35,000
1207 Bhavya 20 महिला 15,000
1208 रेशमा 19 महिला 15,000
1209 क्रांति 22 पुरुष 22,000
1210 सतीश 24 पुरुष 25,000
1211 कृष्णा 25 पुरुष 25,000
1212 अरशद 28 पुरुष 20,000
1213 लावण्या 18 महिला 8000

हमें विभिन्न आयु समूहों (उदाहरण के लिए, 20 से नीचे, 21 से 30 के बीच, 30 से ऊपर) में लिंग द्वारा उच्चतम वेतनभोगी कर्मचारी को खोजने के लिए इनपुट डेटासेट को संसाधित करने के लिए एक आवेदन लिखना होगा।

इनपुट डेटा

उपरोक्त डेटा के रूप में सहेजा गया है input.txt "/ home / hadoop / hadoopPartitioner" निर्देशिका में और इनपुट के रूप में दिया गया है।

1201 गोपाल 45 पुरुष 50000
1202 मनीषा 40 महिला 51000
1203 खलील 34 पुरुष 30000
1204 प्रशांत 30 पुरुष 31000
1205 किरण 20 पुरुष 40000
1206 लक्ष्मी 25 महिला 35000
1207 Bhavya 20 महिला 15000
1208 रेशमा 19 महिला 14000
1209 क्रांति 22 पुरुष 22000
1210 सतीश 24 पुरुष 25000
1211 कृष्णा 25 पुरुष 26000
1212 अरशद 28 पुरुष 20000
1213 लावण्या 18 महिला 8000

दिए गए इनपुट के आधार पर, कार्यक्रम की एल्गोरिथम व्याख्या निम्नलिखित है।

मानचित्र कार्य

मैप टास्क इनपुट के रूप में की-वैल्यू पेयर को स्वीकार करता है जबकि हमारे पास टेक्स्ट फाइल में टेक्स्ट डेटा होता है। इस मानचित्र कार्य के लिए इनपुट निम्नानुसार है -

Input - कुंजी एक पैटर्न होगा जैसे "कोई विशेष कुंजी + फ़ाइल नाम + लाइन नंबर" (उदाहरण: कुंजी = @ input1) और मान उस रेखा में डेटा होगा (उदाहरण: मान = 1201 \ t गोपाल \ t 45 \ t पुरुष \ t 50000)।

Method - इस मानचित्र कार्य का संचालन निम्नानुसार है -

  • को पढ़िए value (रिकॉर्ड डेटा), जो एक स्ट्रिंग में तर्क सूची से इनपुट मान के रूप में आता है।

  • स्प्लिट फ़ंक्शन का उपयोग करके, लिंग और स्टोर को एक स्ट्रिंग चर में अलग करें।

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • लिंग की जानकारी और रिकॉर्ड डेटा भेजें value मानचित्र कार्य से आउटपुट की-वैल्यू पेयर के रूप में partition task

context.write(new Text(gender), new Text(value));
  • पाठ फ़ाइल के सभी रिकॉर्ड के लिए उपरोक्त सभी चरणों को दोहराएं।

Output - आपको लिंग डेटा और की-वैल्यू जोड़े के रूप में रिकॉर्ड डेटा मूल्य मिलेगा।

विभाजनकर्ता कार्य

विभाजनकर्ता कार्य अपने इनपुट के रूप में मानचित्र कार्य से मुख्य-मूल्य जोड़े को स्वीकार करता है। विभाजन का तात्पर्य डेटा को खंडों में विभाजित करना है। विभाजन के दिए गए सशर्त मानदंडों के अनुसार, इनपुट कुंजी-मूल्य युग्मित डेटा को आयु मानदंडों के आधार पर तीन भागों में विभाजित किया जा सकता है।

Input - कुंजी-मूल्य जोड़े के संग्रह में पूरा डेटा।

की = रिकॉर्ड में जेंडर फील्ड वैल्यू।

मूल्य = पूरे लिंग का रिकॉर्ड डेटा मूल्य।

Method - विभाजन तर्क की प्रक्रिया निम्नानुसार चलती है।

  • इनपुट की-वैल्यू जोड़ी से आयु क्षेत्र मान पढ़ें।
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • निम्नलिखित स्थितियों के साथ आयु मान की जाँच करें।

    • 20 से कम या बराबर उम्र
    • आयु २० से अधिक और ३० के बराबर या उससे कम।
    • आयु 30 से अधिक।
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- कुंजी-मूल्य जोड़े के पूरे डेटा को कुंजी-मूल्य जोड़े के तीन संग्रह में विभाजित किया गया है। Reducer प्रत्येक संग्रह पर व्यक्तिगत रूप से काम करता है।

कार्य कम करें

विभाजनकर्ता कार्यों की संख्या Reducer कार्यों की संख्या के बराबर है। यहां हमारे पास तीन विभाजन कार्य हैं और इसलिए हमारे पास निष्पादन के लिए तीन Reducer कार्य हैं।

Input - Reducer तीन बार कुंजी-मूल्य जोड़े के विभिन्न संग्रह के साथ निष्पादित करेगा।

रिकॉर्ड में कुंजी = लिंग क्षेत्र मान।

मूल्य = उस लिंग का पूरा रिकॉर्ड डेटा।

Method - प्रत्येक संग्रह पर निम्नलिखित तर्क लागू किए जाएंगे।

  • प्रत्येक रिकॉर्ड का वेतन क्षेत्र मान पढ़ें।
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • अधिकतम चर के साथ वेतन की जांच करें। यदि str [4] अधिकतम वेतन है, तो str [4] को अधिकतम असाइन करें, अन्यथा चरण छोड़ें।

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • प्रत्येक प्रमुख संग्रह के लिए चरण 1 और 2 दोहराएं (पुरुष और महिला प्रमुख संग्रह हैं)। इन तीन चरणों को निष्पादित करने के बाद, आपको पुरुष कुंजी संग्रह से एक अधिकतम वेतन और महिला कुंजी संग्रह से एक अधिकतम वेतन मिलेगा।

context.write(new Text(key), new IntWritable(max));

Output- अंत में, आपको विभिन्न आयु समूहों के तीन संग्रह में कुंजी-मूल्य जोड़ी डेटा का एक सेट मिलेगा। इसमें पुरुष संग्रह से अधिकतम वेतन और प्रत्येक आयु वर्ग में महिला संग्रह से अधिकतम वेतन क्रमशः है।

मैप, पार्टिशनर और रिड्यूस कार्यों को अंजाम देने के बाद, कुंजी-मूल्य जोड़ी डेटा के तीन संग्रह आउटपुट के रूप में तीन अलग-अलग फाइलों में संग्रहीत किए जाते हैं।

सभी तीन कार्यों को मैपरेडेस नौकरियों के रूप में माना जाता है। इन नौकरियों की निम्नलिखित आवश्यकताओं और विनिर्देशों को विन्यास में निर्दिष्ट किया जाना चाहिए -

  • कार्य नाम
  • कुंजी और मूल्यों के इनपुट और आउटपुट प्रारूप
  • मानचित्र, कमी और विभाजन कार्यों के लिए अलग-अलग कक्षाएं
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

उदाहरण कार्यक्रम

निम्न प्रोग्राम दिखाता है कि MapReduce प्रोग्राम में दिए गए मानदंडों के लिए पार्टीशनर्स को कैसे लागू किया जाए।

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

उपरोक्त कोड को इस प्रकार सेव करें PartitionerExample.java"/ घर / हैडूप / हडूपार्टिशनर" में। कार्यक्रम का संकलन और निष्पादन नीचे दिया गया है।

संकलन और निष्पादन

आइए हम मान लें कि हम Hadoop उपयोगकर्ता की होम निर्देशिका में हैं (उदाहरण के लिए, / home / hadoop)।

उपरोक्त कार्यक्रम को संकलित करने और निष्पादित करने के लिए नीचे दिए गए चरणों का पालन करें।

Step 1- Hadoop-core-1.2.1.jar डाउनलोड करें, जो MapReduce प्रोग्राम को संकलित करने और निष्पादित करने के लिए उपयोग किया जाता है। आप jvn को mvnrepository.com से डाउनलोड कर सकते हैं ।

मान लें कि डाउनलोड किया गया फोल्डर “/ home / hadoop / hadoopPartitioner” है

Step 2 - प्रोग्राम को कंपाइल करने के लिए निम्न कमांड का उपयोग किया जाता है PartitionerExample.java और कार्यक्रम के लिए एक जार बनाना।

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - एचडीएफएस में इनपुट डायरेक्टरी बनाने के लिए निम्न कमांड का उपयोग करें।

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - नामित इनपुट फ़ाइल को कॉपी करने के लिए निम्न कमांड का उपयोग करें input.txt एचडीएफएस की इनपुट डायरेक्टरी में।

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - इनपुट डायरेक्टरी में फाइलों को सत्यापित करने के लिए निम्न कमांड का उपयोग करें।

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - इनपुट डायरेक्टरी से इनपुट फाइल लेकर टॉप सैलरी एप्लिकेशन को चलाने के लिए निम्न कमांड का उपयोग करें।

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

फ़ाइल निष्पादित होने तक कुछ समय तक प्रतीक्षा करें। निष्पादन के बाद, आउटपुट में कई इनपुट विभाजन, मानचित्र कार्य और Reducer कार्य शामिल हैं।

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - आउटपुट फ़ोल्डर में परिणामी फाइलों को सत्यापित करने के लिए निम्न कमांड का उपयोग करें।

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

आपको तीन फ़ाइलों में आउटपुट मिलेगा क्योंकि आप अपने प्रोग्राम में तीन पार्टीटर्स और तीन रेड्यूसर का उपयोग कर रहे हैं।

Step 8 - आउटपुट को देखने के लिए निम्न कमांड का उपयोग करें Part-00000फ़ाइल। यह फाइल HDFS द्वारा जनरेट की गई है।

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

आउटपुट को देखने के लिए निम्न कमांड का उपयोग करें Part-00001 फ़ाइल।

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

आउटपुट को देखने के लिए निम्न कमांड का उपयोग करें Part-00002 फ़ाइल।

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000