MapReduce - разделитель
Разделитель работает как условие при обработке входного набора данных. Фаза разделения происходит после фазы карты и перед фазой сокращения.
Количество разделителей равно количеству редукторов. Это означает, что разделитель разделит данные по количеству редукторов. Следовательно, данные, передаваемые из одного модуля разделения, обрабатываются одним редуктором.
Разделитель
Разделитель разделяет пары ключ-значение промежуточных выходов карты. Он разделяет данные с использованием определенного пользователем условия, которое работает как хеш-функция. Общее количество разделов совпадает с количеством задач Reducer для задания. Давайте рассмотрим пример, чтобы понять, как работает разделитель.
Реализация MapReduce Partitioner
Для удобства предположим, что у нас есть небольшая таблица под названием «Сотрудник» со следующими данными. Мы будем использовать этот образец данных в качестве входного набора данных, чтобы продемонстрировать, как работает секционер.
Я бы | имя | Возраст | Пол | Зарплата |
---|---|---|---|---|
1201 | гопал | 45 | мужчина | 50 000 |
1202 | Manisha | 40 | женский | 50 000 |
1203 | Халил | 34 | мужчина | 30 000 |
1204 | прасант | 30 | мужчина | 30 000 |
1205 | Киран | 20 | мужчина | 40 000 |
1206 | Laxmi | 25 | женский | 35 000 |
1207 | бхавья | 20 | женский | 15 000 |
1208 | решма | 19 | женский | 15 000 |
1209 | Кранти | 22 | мужчина | 22 000 |
1210 | Satish | 24 | мужчина | 25 000 |
1211 | Кришна | 25 | мужчина | 25 000 |
1212 | Аршад | 28 | мужчина | 20 000 |
1213 | Лаванья | 18 | женский | 8 000 |
Мы должны написать приложение для обработки входного набора данных, чтобы найти самого высокооплачиваемого сотрудника по полу в разных возрастных группах (например, ниже 20, от 21 до 30, старше 30).
Входные данные
Приведенные выше данные сохраняются как input.txt в каталоге «/ home / hadoop / hadoopPartitioner» и указан в качестве входных данных.
1201 | гопал | 45 | мужчина | 50000 |
1202 | Manisha | 40 | женский | 51000 |
1203 | Халил | 34 | мужчина | 30000 |
1204 | прасант | 30 | мужчина | 31000 |
1205 | Киран | 20 | мужчина | 40000 |
1206 | Laxmi | 25 | женский | 35000 |
1207 | бхавья | 20 | женский | 15000 |
1208 | решма | 19 | женский | 14000 |
1209 | Кранти | 22 | мужчина | 22000 |
1210 | Satish | 24 | мужчина | 25000 |
1211 | Кришна | 25 | мужчина | 26000 |
1212 | Аршад | 28 | мужчина | 20000 |
1213 | Лаванья | 18 | женский | 8000 |
На основе введенных данных ниже приводится алгоритмическое объяснение программы.
Задачи карты
Задача карты принимает пары ключ-значение в качестве входных данных, пока у нас есть текстовые данные в текстовом файле. Вход для этой задачи карты следующий:
Input - Ключом будет шаблон, такой как «любой специальный ключ + имя файла + номер строки» (пример: key = @ input1), а значением будут данные в этой строке (пример: value = 1201 \ t gopal \ t 45 \ t Male \ t 50000).
Method - Работа этой задачи карты выглядит следующим образом -
Прочтите value (данные записи), который поступает как входное значение из списка аргументов в строке.
Используя функцию разделения, разделите пол и сохраните в строковой переменной.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Отправьте информацию о поле и данные записи value как выходную пару "ключ-значение" из задачи карты в partition task.
context.write(new Text(gender), new Text(value));
Повторите все вышеперечисленные шаги для всех записей в текстовом файле.
Output - Вы получите данные о поле и значении данных в виде пар "ключ-значение".
Задача разделителя
Задача разделителя принимает пары ключ-значение из задачи карты в качестве входных данных. Разделение подразумевает разделение данных на сегменты. В соответствии с заданными условными критериями разделов входные парные данные "ключ-значение" можно разделить на три части на основе критериев возраста.
Input - Все данные в наборе пар "ключ-значение".
key = значение поля Gender в записи.
value = Значение данных всей записи этого пола.
Method - Процесс логики разбиения выполняется следующим образом.
- Считайте значение поля возраста из входной пары "ключ-значение".
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Проверьте значение возраста при следующих условиях.
- Возраст меньше или равен 20
- Возраст от 20 до 30 лет.
- Возраст старше 30 лет.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Все данные пар "ключ-значение" сегментируются на три коллекции пар "ключ-значение". Редуктор работает индивидуально с каждой коллекцией.
Сократить количество задач
Количество задач разделителя равно количеству задач редуктора. Здесь у нас есть три задачи разметки, и, следовательно, у нас есть три задачи редуктора, которые нужно выполнить.
Input - Редуктор будет выполняться три раза с разным набором пар ключ-значение.
ключ = значение поля пола в записи.
значение = все данные записи этого пола.
Method - К каждой коллекции будет применяться следующая логика.
- Прочтите значение поля Salary каждой записи.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Проверьте зарплату с помощью переменной max. Если str [4] - максимальная зарплата, тогда присвойте str [4] max, в противном случае пропустите шаг.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Повторите шаги 1 и 2 для каждой передачи ключей (передача ключей осуществляется для мужчин и женщин). После выполнения этих трех шагов вы найдете одну максимальную зарплату при выдаче мужских ключей и одну максимальную зарплату при выдаче женских ключей.
context.write(new Text(key), new IntWritable(max));
Output- Наконец, вы получите набор данных пар "ключ-значение" в трех коллекциях для разных возрастных групп. Он содержит максимальную зарплату из мужской коллекции и максимальную зарплату из женской коллекции в каждой возрастной группе соответственно.
После выполнения задач Map, Partitioner и Reduce три коллекции данных пары ключ-значение сохраняются в трех разных файлах в качестве выходных данных.
Все три задачи рассматриваются как задания MapReduce. Следующие требования и спецификации этих работ должны быть указаны в конфигурациях:
- Название работы
- Форматы ввода и вывода ключей и значений
- Индивидуальные классы для задач Map, Reduce и Partitioner
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Пример программы
Следующая программа показывает, как реализовать разделители для заданных критериев в программе MapReduce.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Сохраните приведенный выше код как PartitionerExample.javaв «/ home / hadoop / hadoopPartitioner». Компиляция и выполнение программы приведены ниже.
Компиляция и исполнение
Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).
Следуйте инструкциям ниже, чтобы скомпилировать и выполнить указанную выше программу.
Step 1- Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и выполнения программы MapReduce. Вы можете скачать банку с mvnrepository.com .
Предположим, что загруженная папка - «/ home / hadoop / hadoopPartitioner».
Step 2 - Следующие команды используются для компиляции программы PartitionerExample.java и создание фляги для программы.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - Используйте следующую команду для создания входного каталога в HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Используйте следующую команду, чтобы скопировать входной файл с именем input.txt во входном каталоге HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Используйте следующую команду для проверки файлов во входном каталоге.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Используйте следующую команду для запуска приложения Top salary, взяв входные файлы из входного каталога.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Подождите, пока файл не запустится. После выполнения выходные данные содержат ряд разделений ввода, задач карты и задач Reducer.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Используйте следующую команду, чтобы проверить полученные файлы в выходной папке.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Вы найдете вывод в трех файлах, потому что вы используете в своей программе три разделителя и три редуктора.
Step 8 - Используйте следующую команду, чтобы увидеть результат в Part-00000файл. Этот файл создается HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Используйте следующую команду, чтобы увидеть вывод в Part-00001 файл.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Используйте следующую команду, чтобы увидеть вывод в Part-00002 файл.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000