MapReduce - реализация Hadoop

MapReduce - это платформа, которая используется для написания приложений для надежной обработки огромных объемов данных на больших кластерах обычного оборудования. В этой главе вы познакомитесь с работой MapReduce в среде Hadoop с использованием Java.

Алгоритм MapReduce

Обычно парадигма MapReduce основана на отправке программ сокращения карты на компьютеры, где находятся фактические данные.

  • Во время задания MapReduce Hadoop отправляет задачи Map и Reduce на соответствующие серверы в кластере.

  • Платформа управляет всеми деталями передачи данных, такими как выдача задач, проверка завершения задач и копирование данных по кластеру между узлами.

  • Большая часть вычислений происходит на узлах с данными на локальных дисках, что снижает сетевой трафик.

  • После выполнения данной задачи кластер собирает и сокращает данные, чтобы сформировать соответствующий результат, и отправляет их обратно на сервер Hadoop.

Входы и выходы (перспектива Java)

Платформа MapReduce работает с парами «ключ-значение», то есть она рассматривает входные данные для задания как набор пар «ключ-значение» и создает набор пар «ключ-значение» в качестве выходных данных задания, предположительно различных типов.

Классы ключей и значений должны быть сериализуемыми платформой, и, следовательно, требуется реализовать интерфейс Writable. Кроме того, ключевые классы должны реализовать интерфейс WritableComparable для облегчения сортировки фреймворком.

И входной, и выходной формат задания MapReduce имеют форму пар ключ-значение:

(Вход) <k1, v1> -> map -> <k2, v2> -> reduce -> <k3, v3> (Выход).

Ввод Вывод
карта <k1, v1> список (<k2, v2>)
Уменьшить <k2, список (v2)> список (<k3, v3>)

Реализация MapReduce

В следующей таблице приведены данные о потреблении электроэнергии в организации. Таблица включает в себя ежемесячное потребление электроэнергии и среднегодовое значение за пять лет подряд.

Янв Фев Мар Апр май Июн Июл Авг Сен Октябрь Ноя Декабрь Средн.
1979 г. 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 г. 26 27 28 28 28 30 31 год 31 год 31 год 30 30 30 29
1981 31 год 32 32 32 33 34 35 год 36 36 34 34 34 34
1984 39 38 39 39 39 41 год 42 43 40 39 38 38 40
1985 г. 38 39 39 39 39 41 год 41 год 41 год 00 40 39 39 45

Нам нужно написать приложения для обработки входных данных в данной таблице, чтобы найти год максимального использования, год минимального использования и так далее. Эта задача проста для программистов с ограниченным количеством записей, поскольку они просто напишут логику для получения требуемого вывода и передадут данные в написанное приложение.

Давайте теперь увеличим масштаб входных данных. Предположим, мы должны проанализировать потребление электроэнергии всеми крупными отраслями промышленности конкретного государства. Когда мы пишем приложения для обработки таких массивов данных,

  • На их выполнение уйдет много времени.

  • Когда мы перемещаем данные из источника на сетевой сервер, будет интенсивный сетевой трафик.

Для решения этих проблем у нас есть фреймворк MapReduce.

Входные данные

Приведенные выше данные сохраняются как sample.txtи дан как вход. Входной файл выглядит так, как показано ниже.

1979 г. 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 г. 26 27 28 28 28 30 31 год 31 год 31 год 30 30 30 29
1981 31 год 32 32 32 33 34 35 год 36 36 34 34 34 34
1984 39 38 39 39 39 41 год 42 43 40 39 38 38 40
1985 г. 38 39 39 39 39 41 год 41 год 41 год 00 40 39 39 45

Пример программы

Следующая программа для демонстрационных данных использует платформу MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Сохраните указанную выше программу в ProcessUnits.java. Компиляция и выполнение программы приведены ниже.

Составление и выполнение программы ProcessUnits

Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).

Следуйте инструкциям ниже, чтобы скомпилировать и выполнить указанную выше программу.

Step 1 - Используйте следующую команду, чтобы создать каталог для хранения скомпилированных классов Java.

$ mkdir units

Step 2- Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и выполнения программы MapReduce. Загрузите банку с mvnrepository.com . Предположим, что папка загрузки - / home / hadoop /.

Step 3 - Следующие команды используются для компиляции ProcessUnits.java программу и создать банку для программы.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - Следующая команда используется для создания входного каталога в HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Следующая команда используется для копирования входного файла с именем sample.txt во входном каталоге HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - Следующая команда используется для проверки файлов во входном каталоге

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Следующая команда используется для запуска приложения Eleunit_max путем получения входных файлов из входного каталога.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Подождите, пока файл не запустится. После выполнения вывод содержит несколько входных разделений, задач карты, задач редуктора и т. Д.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - Следующая команда используется для проверки результирующих файлов в выходной папке.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Следующая команда используется для просмотра вывода в Part-00000файл. Этот файл создается HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Ниже приведен вывод, созданный программой MapReduce:

1981 34
1984 40
1985 г. 45

Step 10 - Следующая команда используется для копирования выходной папки из HDFS в локальную файловую систему.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop