MapReduce - Hadoop-Implementierung

MapReduce ist ein Framework, das zum Schreiben von Anwendungen verwendet wird, um große Datenmengen auf großen Clustern von Standardhardware zuverlässig zu verarbeiten. Dieses Kapitel führt Sie durch die Funktionsweise von MapReduce im Hadoop-Framework mit Java.

MapReduce-Algorithmus

Im Allgemeinen basiert das MapReduce-Paradigma auf dem Senden von Programmen zur Kartenreduzierung an Computer, auf denen sich die tatsächlichen Daten befinden.

  • Während eines MapReduce-Jobs sendet Hadoop Map- und Reduce-Aufgaben an die entsprechenden Server im Cluster.

  • Das Framework verwaltet alle Details der Datenübergabe, z. B. das Ausgeben von Aufgaben, das Überprüfen der Aufgabenerfüllung und das Kopieren von Daten im Cluster zwischen den Knoten.

  • Der größte Teil der Datenverarbeitung findet auf den Knoten mit Daten auf lokalen Festplatten statt, wodurch der Netzwerkverkehr reduziert wird.

  • Nach Abschluss einer bestimmten Aufgabe sammelt und reduziert der Cluster die Daten, um ein geeignetes Ergebnis zu erhalten, und sendet sie an den Hadoop-Server zurück.

Ein- und Ausgänge (Java-Perspektive)

Das MapReduce-Framework arbeitet mit Schlüssel-Wert-Paaren, dh das Framework betrachtet die Eingabe in den Job als eine Reihe von Schlüssel-Wert-Paaren und erzeugt eine Reihe von Schlüssel-Wert-Paaren als Ausgabe des Jobs, möglicherweise von verschiedenen Typen.

Die Schlüssel- und Wertklassen müssen vom Framework serialisierbar sein und daher muss die beschreibbare Schnittstelle implementiert werden. Darüber hinaus müssen die Schlüsselklassen die WritableComparable-Schnittstelle implementieren, um das Sortieren nach Framework zu erleichtern.

Sowohl das Eingabe- als auch das Ausgabeformat eines MapReduce-Jobs liegen in Form von Schlüssel-Wert-Paaren vor.

(Eingabe) <k1, v1> -> map -> <k2, v2> -> reduzieren -> <k3, v3> (Ausgabe).

Eingang Ausgabe
Karte <k1, v1> Liste (<k2, v2>)
Reduzieren <k2, Liste (v2)> Liste (<k3, v3>)

MapReduce-Implementierung

Die folgende Tabelle zeigt die Daten zum Stromverbrauch einer Organisation. Die Tabelle enthält den monatlichen Stromverbrauch und den Jahresdurchschnitt für fünf aufeinanderfolgende Jahre.

Jan. Feb. Beschädigen Apr. Kann Jun Jul Aug. Sep. Okt. Nov. Dez. Durchschn
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Wir müssen Anwendungen schreiben, um die Eingabedaten in der angegebenen Tabelle zu verarbeiten, um das Jahr der maximalen Nutzung, das Jahr der minimalen Nutzung usw. zu ermitteln. Diese Aufgabe ist für Programmierer mit einer begrenzten Anzahl von Datensätzen einfach, da sie einfach die Logik schreiben, um die erforderliche Ausgabe zu erzeugen, und die Daten an die geschriebene Anwendung weitergeben.

Erhöhen wir nun den Maßstab der Eingabedaten. Angenommen, wir müssen den Stromverbrauch aller Großindustrien eines bestimmten Staates analysieren. Wenn wir Anwendungen schreiben, um solche Massendaten zu verarbeiten,

  • Die Ausführung wird viel Zeit in Anspruch nehmen.

  • Es wird starken Netzwerkverkehr geben, wenn wir Daten von der Quelle auf den Netzwerkserver verschieben.

Um diese Probleme zu lösen, haben wir das MapReduce-Framework.

Eingabedaten

Die obigen Daten werden als gespeichert sample.txtund als Eingabe gegeben. Die Eingabedatei sieht wie folgt aus.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Beispielprogramm

Das folgende Programm für die Beispieldaten verwendet das MapReduce-Framework.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Speichern Sie das obige Programm in ProcessUnits.java. Die Zusammenstellung und Ausführung des Programms ist unten angegeben.

Kompilierung und Ausführung des ProcessUnits-Programms

Nehmen wir an, wir befinden uns im Home-Verzeichnis des Hadoop-Benutzers (z. B. / home / hadoop).

Führen Sie die folgenden Schritte aus, um das obige Programm zu kompilieren und auszuführen.

Step 1 - Verwenden Sie den folgenden Befehl, um ein Verzeichnis zum Speichern der kompilierten Java-Klassen zu erstellen.

$ mkdir units

Step 2- Laden Sie Hadoop-core-1.2.1.jar herunter, mit dem das MapReduce-Programm kompiliert und ausgeführt wird. Laden Sie das Glas von mvnrepository.com herunter . Nehmen wir an, der Download-Ordner lautet / home / hadoop /.

Step 3 - Die folgenden Befehle werden zum Kompilieren der verwendet ProcessUnits.java Programm und ein Glas für das Programm zu erstellen.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - Mit dem folgenden Befehl wird ein Eingabeverzeichnis in HDFS erstellt.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Mit dem folgenden Befehl wird die benannte Eingabedatei kopiert sample.txt im Eingabeverzeichnis von HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - Mit dem folgenden Befehl werden die Dateien im Eingabeverzeichnis überprüft

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Mit dem folgenden Befehl wird die Anwendung Eleunit_max ausgeführt, indem Eingabedateien aus dem Eingabeverzeichnis entnommen werden.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Warten Sie eine Weile, bis die Datei ausgeführt wird. Nach der Ausführung enthält die Ausgabe eine Reihe von Eingabeaufteilungen, Zuordnungsaufgaben, Reduzierungsaufgaben usw.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - Mit dem folgenden Befehl werden die resultierenden Dateien im Ausgabeordner überprüft.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Mit dem folgenden Befehl wird die Ausgabe in angezeigt Part-00000Datei. Diese Datei wird von HDFS generiert.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Es folgt die vom MapReduce-Programm generierte Ausgabe:

1981 34
1984 40
1985 45

Step 10 - Mit dem folgenden Befehl wird der Ausgabeordner von HDFS in das lokale Dateisystem kopiert.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop