MapReduce - Bölümleyici
Bir bölümleyici, bir girdi veri kümesini işlemede bir koşul gibi çalışır. Bölme aşaması, Harita aşamasından sonra ve Azaltma aşamasından önce gerçekleşir.
Bölme sayısı, redüktör sayısına eşittir. Bu, bir bölümleyicinin verileri redüktör sayısına göre böleceği anlamına gelir. Bu nedenle, tek bir bölümleyiciden aktarılan veriler tek bir Redüktör tarafından işlenir.
Bölümleyici
Bir bölümleyici, ara Harita çıktılarının anahtar-değer çiftlerini bölümler. Karma işlevi gibi çalışan, kullanıcı tanımlı bir koşul kullanarak verileri bölümler. Toplam bölüm sayısı, iş için Reducer görevlerinin sayısıyla aynıdır. Bölümleyicinin nasıl çalıştığını anlamak için bir örnek alalım.
MapReduce Partitioner Uygulaması
Kolaylık olması açısından, aşağıdaki verileri içeren Çalışan adında küçük bir tablomuz olduğunu varsayalım. Bu örnek verileri, bölümleyicinin nasıl çalıştığını göstermek için girdi veri kümemiz olarak kullanacağız.
İD | İsim | Yaş | Cinsiyet | Maaş |
---|---|---|---|---|
1201 | gopal | 45 | Erkek | 50.000 |
1202 | Manisha | 40 | Kadın | 50.000 |
1203 | Khalil | 34 | Erkek | 30.000 |
1204 | prasant | 30 | Erkek | 30.000 |
1205 | Kiran | 20 | Erkek | 40.000 |
1206 | Laxmi | 25 | Kadın | 35.000 |
1207 | Bhavya | 20 | Kadın | 15.000 |
1208 | reshma | 19 | Kadın | 15.000 |
1209 | Kranthi | 22 | Erkek | 22.000 |
1210 | Satish | 24 | Erkek | 25.000 |
1211 | Krishna | 25 | Erkek | 25.000 |
1212 | Arshad | 28 | Erkek | 20.000 |
1213 | Lavanya | 18 | Kadın | 8.000 |
Farklı yaş gruplarında cinsiyete göre en yüksek maaşlı çalışanı bulmak için girdi veri setini işlemek için bir uygulama yazmalıyız (örneğin, 20'nin altı, 21-30 arası, 30'un üstü).
Giriş Verileri
Yukarıdaki veriler şu şekilde kaydedilir: input.txt "/ home / hadoop / hadoopPartitioner" dizininde ve girdi olarak verilir.
1201 | gopal | 45 | Erkek | 50000 |
1202 | Manisha | 40 | Kadın | 51000 |
1203 | Khaleel | 34 | Erkek | 30000 |
1204 | prasant | 30 | Erkek | 31000 |
1205 | Kiran | 20 | Erkek | 40000 |
1206 | Laxmi | 25 | Kadın | 35.000 |
1207 | Bhavya | 20 | Kadın | 15.000 |
1208 | reshma | 19 | Kadın | 14.000 |
1209 | Kranthi | 22 | Erkek | 22.000 |
1210 | Satish | 24 | Erkek | 25.000 |
1211 | Krishna | 25 | Erkek | 26.000 |
1212 | Arshad | 28 | Erkek | 20.000 |
1213 | Lavanya | 18 | Kadın | 8000 |
Verilen girdiye bağlı olarak, programın algoritmik açıklaması aşağıdadır.
Harita Görevleri
Harita görevi, metin verisini bir metin dosyasında tuttuğumuzda, anahtar-değer çiftlerini girdi olarak kabul eder. Bu harita görevinin girdisi aşağıdaki gibidir -
Input - Anahtar, "herhangi bir özel anahtar + dosya adı + satır numarası" (örnek: anahtar = @ girdi1) gibi bir kalıp olur ve değer, bu satırdaki veriler olur (örnek: değer = 1201 \ t gopal \ t 45 \ t Erkek \ t 50000).
Method - Bu harita görevinin çalışması aşağıdaki gibidir -
Okumak value (kayıt verileri), bir dizedeki bağımsız değişken listesinden giriş değeri olarak gelir.
Bölme işlevini kullanarak cinsiyeti ayırın ve bir dize değişkeninde saklayın.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Cinsiyet bilgilerini ve kayıt verilerini gönderin value eşleme görevinden anahtar / değer çifti çıktısı olarak partition task.
context.write(new Text(gender), new Text(value));
Metin dosyasındaki tüm kayıtlar için yukarıdaki tüm adımları tekrarlayın.
Output - Cinsiyet verilerini ve kayıt veri değerini anahtar / değer çiftleri olarak alacaksınız.
Partitioner Görevi
Bölümleyici görevi, eşleme görevindeki anahtar-değer çiftlerini girdi olarak kabul eder. Bölümleme, verilerin bölümlere bölünmesi anlamına gelir. Bölümlerin verilen koşullu kriterlerine göre, giriş anahtar-değer eşleştirilmiş verileri yaş kriterine göre üç bölüme ayrılabilir.
Input - Anahtar / değer çiftlerinden oluşan bir koleksiyondaki verilerin tamamı.
key = Kayıttaki cinsiyet alanı değeri.
değer = Bu cinsiyetin tüm kayıt veri değeri.
Method - Bölümleme mantığı süreci aşağıdaki gibi çalışır.
- Giriş anahtar / değer çiftinden yaş alanı değerini okuyun.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Yaş değerini aşağıdaki koşullarla kontrol edin.
- 20'den küçük veya eşit yaş
- Yaş 20'den büyük ve 30'dan küçük veya 30'a eşit.
- Yaş 30'dan büyük.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Anahtar / değer çiftlerinin tüm verileri, anahtar / değer çiftlerinin üç koleksiyonuna bölünür. Redüktör, her koleksiyonda ayrı ayrı çalışır.
Görevleri Azaltın
Bölme görevlerinin sayısı, düşürücü görevlerin sayısına eşittir. Burada üç bölümleyici görevimiz var ve dolayısıyla gerçekleştirilecek üç Redüktör görevimiz var.
Input - İndirgeyici, farklı anahtar / değer çiftleri koleksiyonuyla üç kez çalıştırılır.
kayıttaki anahtar = cinsiyet alanı değeri.
değer = o cinsiyetin tüm kayıt verileri.
Method - Her koleksiyona aşağıdaki mantık uygulanacaktır.
- Her kaydın Maaş alanı değerini okuyun.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Maaşı maksimum değişkenle kontrol edin. Eğer str [4] maksimum maaş ise, str [4] 'ü max'a atayın, aksi takdirde adımı atlayın.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Her bir anahtar teslimi için Adım 1 ve 2'yi tekrarlayın (Erkek ve Kadın anahtar koleksiyonlardır). Bu üç adımı uyguladıktan sonra, Erkek anahtar koleksiyonundan bir maksimum maaş ve Kadın anahtar koleksiyonundan bir maksimum maaş bulacaksınız.
context.write(new Text(key), new IntWritable(max));
Output- Son olarak, farklı yaş gruplarından oluşan üç koleksiyonda bir dizi anahtar / değer çifti verisi alacaksınız. Sırasıyla her yaş grubundaki Erkek koleksiyonundan maksimum maaşı ve Kadın koleksiyonundan maksimum maaşı içerir.
Map, Partitioner ve Reduce görevlerini çalıştırdıktan sonra, anahtar-değer çifti verilerinin üç koleksiyonu çıktı olarak üç farklı dosyada depolanır.
Üç görevin tümü MapReduce işleri olarak değerlendirilir. Bu işlerin aşağıdaki gereksinimleri ve özellikleri Konfigürasyonlarda belirtilmelidir -
- İş adı
- Anahtarların ve değerlerin giriş ve çıkış formatları
- Map, Reduce ve Partitioner görevleri için ayrı sınıflar
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Örnek Program
Aşağıdaki program, bir MapReduce programında verilen kriterler için bölümleyicilerin nasıl uygulanacağını gösterir.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Yukarıdaki kodu farklı kaydedin PartitionerExample.java"/ home / hadoop / hadoopPartitioner" içinde. Programın derlenmesi ve çalıştırılması aşağıda verilmiştir.
Derleme ve Yürütme
Hadoop kullanıcısının ana dizininde olduğumuzu varsayalım (örneğin, / home / hadoop).
Yukarıdaki programı derlemek ve yürütmek için aşağıda verilen adımları izleyin.
Step 1- MapReduce programını derlemek ve yürütmek için kullanılan Hadoop-core-1.2.1.jar dosyasını indirin. Kavanozu mvnrepository.com adresinden indirebilirsiniz .
İndirilen klasörün "/ home / hadoop / hadoopPartitioner" olduğunu varsayalım
Step 2 - Programı derlemek için aşağıdaki komutlar kullanılır PartitionerExample.java ve program için bir kavanoz oluşturmak.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - HDFS'de bir giriş dizini oluşturmak için aşağıdaki komutu kullanın.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - adlı giriş dosyasını kopyalamak için aşağıdaki komutu kullanın input.txt HDFS'nin giriş dizininde.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Giriş dizinindeki dosyaları doğrulamak için aşağıdaki komutu kullanın.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Giriş dosyalarını giriş dizininden alarak En yüksek maaş uygulamasını çalıştırmak için aşağıdaki komutu kullanın.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Dosya yürütülene kadar bir süre bekleyin. Yürütmeden sonra, çıktı bir dizi giriş bölmesi, harita görevleri ve Redüktör görevleri içerir.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Çıktı klasöründe ortaya çıkan dosyaları doğrulamak için aşağıdaki komutu kullanın.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Çıktıyı üç dosyada bulacaksınız çünkü programınızda üç bölümleyici ve üç Redüktör kullanıyorsunuz.
Step 8 - Çıktıyı görmek için aşağıdaki komutu kullanın Part-00000dosya. Bu dosya HDFS tarafından oluşturulmuştur.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Çıktıyı görmek için aşağıdaki komutu kullanın. Part-00001 dosya.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Çıktıyı görmek için aşağıdaki komutu kullanın. Part-00002 dosya.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000