MapReduce - Implementação Hadoop

MapReduce é uma estrutura usada para escrever aplicativos para processar grandes volumes de dados em grandes clusters de hardware comum de maneira confiável. Este capítulo o conduz pela operação de MapReduce na estrutura Hadoop usando Java.

Algoritmo MapReduce

Geralmente o paradigma MapReduce é baseado no envio de programas de redução de mapa para computadores onde os dados reais residem.

  • Durante um trabalho MapReduce, o Hadoop envia tarefas Map e Reduce para os servidores apropriados no cluster.

  • A estrutura gerencia todos os detalhes da passagem de dados, como emitir tarefas, verificar a conclusão da tarefa e copiar dados em todo o cluster entre os nós.

  • A maior parte da computação ocorre nos nós com dados em discos locais que reduzem o tráfego de rede.

  • Depois de concluir uma determinada tarefa, o cluster coleta e reduz os dados para formar um resultado apropriado e os envia de volta ao servidor Hadoop.

Entradas e saídas (perspectiva Java)

A estrutura MapReduce opera em pares de valor-chave, ou seja, a estrutura visualiza a entrada para a tarefa como um conjunto de pares de valor-chave e produz um conjunto de par de valor-chave como a saída da tarefa, possivelmente de diferentes tipos.

As classes de chave e valor devem ser serializáveis ​​pela estrutura e, portanto, são necessárias para implementar a interface gravável. Além disso, as classes-chave devem implementar a interface WritableComparable para facilitar a classificação pela estrutura.

Os formatos de entrada e saída de um trabalho MapReduce estão na forma de pares de valores-chave -

(Entrada) <k1, v1> -> mapa -> <k2, v2> -> reduzir -> <k3, v3> (saída).

Entrada Resultado
Mapa <k1, v1> lista (<k2, v2>)
Reduzir <k2, lista (v2)> lista (<k3, v3>)

Implementação MapReduce

A tabela a seguir mostra os dados relativos ao consumo elétrico de uma organização. A tabela inclui o consumo elétrico mensal e a média anual de cinco anos consecutivos.

Jan Fev Mar Abr Maio Junho Jul Agosto Set Out Nov Dez Média
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Precisamos escrever aplicativos para processar os dados de entrada na tabela fornecida para encontrar o ano de uso máximo, o ano de uso mínimo e assim por diante. Esta tarefa é fácil para programadores com quantidade finita de registros, pois eles simplesmente escreverão a lógica para produzir a saída necessária e passarão os dados para o aplicativo escrito.

Vamos agora aumentar a escala dos dados de entrada. Suponha que temos que analisar o consumo elétrico de todas as indústrias de grande escala de um determinado estado. Quando escrevemos aplicativos para processar esses dados em massa,

  • Eles levarão muito tempo para serem executados.

  • Haverá tráfego de rede intenso quando movermos os dados da origem para o servidor de rede.

Para resolver esses problemas, temos a estrutura MapReduce.

Dados de entrada

Os dados acima são salvos como sample.txte fornecido como entrada. O arquivo de entrada se parece com o mostrado abaixo.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Programa Exemplo

O programa a seguir para os dados de amostra usa a estrutura MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Salve o programa acima em ProcessUnits.java. A compilação e execução do programa são fornecidas abaixo.

Compilação e execução do programa ProcessUnits

Vamos supor que estamos no diretório inicial do usuário Hadoop (por exemplo, / home / hadoop).

Siga as etapas fornecidas a seguir para compilar e executar o programa acima.

Step 1 - Use o seguinte comando para criar um diretório para armazenar as classes java compiladas.

$ mkdir units

Step 2- Baixe Hadoop-core-1.2.1.jar, que é usado para compilar e executar o programa MapReduce. Baixe o jar em mvnrepository.com . Vamos supor que a pasta de download seja / home / hadoop /.

Step 3 - Os seguintes comandos são usados ​​para compilar o ProcessUnits.java programa e para criar um jar para o programa.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - O seguinte comando é usado para criar um diretório de entrada no HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - O seguinte comando é usado para copiar o arquivo de entrada chamado sample.txt no diretório de entrada do HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - O seguinte comando é usado para verificar os arquivos no diretório de entrada

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - O seguinte comando é usado para executar o aplicativo Eleunit_max, obtendo arquivos de entrada do diretório de entrada.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Espere um pouco até que o arquivo seja executado. Após a execução, a saída contém uma série de divisões de entrada, tarefas de mapa, tarefas de redutor, etc.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - O seguinte comando é usado para verificar os arquivos resultantes na pasta de saída.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - O seguinte comando é usado para ver a saída em Part-00000Arquivo. Este arquivo é gerado pelo HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

A seguir está a saída gerada pelo programa MapReduce -

1981 34
1984 40
1985 45

Step 10 - O seguinte comando é usado para copiar a pasta de saída do HDFS para o sistema de arquivos local.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop