Hadoop - MapReduce

MapReduce ist ein Framework, mit dem wir Anwendungen schreiben können, um große Datenmengen parallel auf großen Clustern von Standardhardware auf zuverlässige Weise zu verarbeiten.

Was ist MapReduce?

MapReduce ist eine Verarbeitungstechnik und ein Programmmodell für verteiltes Computing auf Basis von Java. Der MapReduce-Algorithmus enthält zwei wichtige Aufgaben, nämlich Map und Reduce. Map nimmt einen Datensatz und konvertiert ihn in einen anderen Datensatz, in dem einzelne Elemente in Tupel (Schlüssel / Wert-Paare) zerlegt werden. Zweitens reduzieren Sie die Aufgabe, bei der die Ausgabe einer Karte als Eingabe verwendet und diese Datentupel zu einem kleineren Satz von Tupeln kombiniert werden. Wie die Reihenfolge des Namens MapReduce impliziert, wird die Reduzierungsaufgabe immer nach dem Kartenjob ausgeführt.

Der Hauptvorteil von MapReduce besteht darin, dass die Datenverarbeitung einfach über mehrere Rechenknoten skaliert werden kann. Im MapReduce-Modell werden die Datenverarbeitungsprimitive als Mapper und Reducer bezeichnet. Das Zerlegen einer Datenverarbeitungsanwendung in Mapper und Reduzierer ist manchmal nicht trivial. Sobald wir jedoch eine Anwendung im MapReduce-Formular schreiben, ist die Skalierung der Anwendung auf Hunderte, Tausende oder sogar Zehntausende von Computern in einem Cluster lediglich eine Konfigurationsänderung. Diese einfache Skalierbarkeit hat viele Programmierer dazu bewegt, das MapReduce-Modell zu verwenden.

Der Algorithmus

  • Im Allgemeinen basiert das MapReduce-Paradigma darauf, den Computer an den Ort zu senden, an dem sich die Daten befinden!

  • Das MapReduce-Programm wird in drei Phasen ausgeführt, nämlich Map-Phase, Shuffle-Phase und Reduktionsphase.

    • Map stage- Die Aufgabe der Karte oder des Mappers besteht darin, die Eingabedaten zu verarbeiten. Im Allgemeinen liegen die Eingabedaten in Form einer Datei oder eines Verzeichnisses vor und werden im Hadoop-Dateisystem (HDFS) gespeichert. Die Eingabedatei wird zeilenweise an die Mapper-Funktion übergeben. Der Mapper verarbeitet die Daten und erstellt mehrere kleine Datenblöcke.

    • Reduce stage - Diese Phase ist die Kombination der Shuffle Bühne und die ReduceBühne. Die Aufgabe des Reduzierers besteht darin, die Daten zu verarbeiten, die vom Mapper stammen. Nach der Verarbeitung wird ein neuer Ausgabesatz erstellt, der im HDFS gespeichert wird.

  • Während eines MapReduce-Jobs sendet Hadoop die Map- und Reduce-Aufgaben an die entsprechenden Server im Cluster.

  • Das Framework verwaltet alle Details der Datenübergabe, z. B. das Ausgeben von Aufgaben, das Überprüfen der Aufgabenerfüllung und das Kopieren von Daten im Cluster zwischen den Knoten.

  • Der größte Teil der Datenverarbeitung findet auf Knoten mit Daten auf lokalen Festplatten statt, wodurch der Netzwerkverkehr reduziert wird.

  • Nach Abschluss der angegebenen Aufgaben sammelt und reduziert der Cluster die Daten, um ein geeignetes Ergebnis zu erhalten, und sendet sie an den Hadoop-Server zurück.

Ein- und Ausgänge (Java-Perspektive)

Das MapReduce-Framework verarbeitet <Schlüssel, Wert> -Paare, dh das Framework betrachtet die Eingabe in den Job als Satz von <Schlüssel, Wert> -Paaren und erzeugt einen Satz von <Schlüssel, Wert> -Paaren als Ausgabe des Jobs denkbar von verschiedenen Arten.

Der Schlüssel und die Werteklassen sollten vom Framework serialisiert werden und müssen daher die beschreibbare Schnittstelle implementieren. Darüber hinaus müssen die Schlüsselklassen die Schnittstelle Writable-Comparable implementieren, um das Sortieren nach Framework zu erleichtern. Eingabe- und Ausgabetypen von aMapReduce job - (Eingabe) <k1, v1> → Karte → <k2, v2> → reduzieren → <k3, v3> (Ausgabe).

Eingang Ausgabe
Karte <k1, v1> Liste (<k2, v2>)
Reduzieren <k2, Liste (v2)> Liste (<k3, v3>)

Terminologie

  • PayLoad - Anwendungen implementieren die Funktionen Map und Reduce und bilden den Kern des Jobs.

  • Mapper - Mapper ordnet die Eingabeschlüssel / Wert-Paare einem Satz von Zwischenschlüssel / Wert-Paaren zu.

  • NamedNode - Knoten, der das Hadoop Distributed File System (HDFS) verwaltet.

  • DataNode - Knoten, auf dem die Daten vor der Verarbeitung im Voraus präsentiert werden.

  • MasterNode - Knoten, auf dem JobTracker ausgeführt wird und der Jobanforderungen von Clients akzeptiert.

  • SlaveNode - Knoten, auf dem das Map and Reduce-Programm ausgeführt wird.

  • JobTracker - Plant Jobs und verfolgt die dem Task-Tracker zugewiesenen Jobs.

  • Task Tracker - Verfolgt die Aufgabe und meldet den Status an JobTracker.

  • Job - Ein Programm ist eine Ausführung eines Mapper und Reducer in einem Datensatz.

  • Task - Eine Ausführung eines Mappers oder eines Reduzierers auf einem Datenabschnitt.

  • Task Attempt - Eine bestimmte Instanz eines Versuchs, eine Aufgabe auf einem SlaveNode auszuführen.

Beispielszenario

Nachstehend sind die Daten zum Stromverbrauch einer Organisation aufgeführt. Es enthält den monatlichen Stromverbrauch und den Jahresdurchschnitt für verschiedene Jahre.

Jan. Feb. Beschädigen Apr. Kann Jun Jul Aug. Sep. Okt. Nov. Dez. Durchschn
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Wenn die oben genannten Daten als Eingabe angegeben werden, müssen wir Anwendungen schreiben, um sie zu verarbeiten und Ergebnisse zu erzielen, z. B. das Jahr der maximalen Nutzung, das Jahr der minimalen Nutzung usw. Dies ist ein Walkover für Programmierer mit einer begrenzten Anzahl von Datensätzen. Sie schreiben einfach die Logik, um die erforderliche Ausgabe zu erzeugen, und übergeben die Daten an die geschriebene Anwendung.

Denken Sie jedoch an die Daten, die den Stromverbrauch aller großen Industrien eines bestimmten Staates seit seiner Gründung darstellen.

Wenn wir Anwendungen schreiben, um solche Massendaten zu verarbeiten,

  • Die Ausführung wird viel Zeit in Anspruch nehmen.

  • Es wird einen starken Netzwerkverkehr geben, wenn wir Daten von der Quelle auf den Netzwerkserver usw. verschieben.

Um diese Probleme zu lösen, haben wir das MapReduce-Framework.

Eingabedaten

Die obigen Daten werden als gespeichert sample.txtund als Eingabe gegeben. Die Eingabedatei sieht wie folgt aus.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Beispielprogramm

Im Folgenden wird das Programm für die Beispieldaten mit dem MapReduce-Framework angegeben.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

Speichern Sie das obige Programm als ProcessUnits.java. Die Kompilierung und Ausführung des Programms wird nachfolgend erläutert.

Zusammenstellung und Ausführung des Programms für Prozesseinheiten

Nehmen wir an, wir befinden uns im Home-Verzeichnis eines Hadoop-Benutzers (z. B. / home / hadoop).

Führen Sie die folgenden Schritte aus, um das obige Programm zu kompilieren und auszuführen.

Schritt 1

Mit dem folgenden Befehl erstellen Sie ein Verzeichnis zum Speichern der kompilierten Java-Klassen.

$ mkdir units

Schritt 2

Herunterladen Hadoop-core-1.2.1.jar,Hiermit wird das MapReduce-Programm kompiliert und ausgeführt. Besuchen Sie den folgenden Link mvnrepository.com , um das Glas herunterzuladen. Nehmen wir an, der heruntergeladene Ordner ist/home/hadoop/.

Schritt 3

Die folgenden Befehle werden zum Kompilieren von verwendet ProcessUnits.java Programm und Erstellen eines Glases für das Programm.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

Schritt 4

Der folgende Befehl wird verwendet, um ein Eingabeverzeichnis in HDFS zu erstellen.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Schritt 5

Der folgende Befehl wird verwendet, um die Eingabedatei mit dem Namen zu kopieren sample.txtim Eingabeverzeichnis von HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Schritt 6

Der folgende Befehl wird verwendet, um die Dateien im Eingabeverzeichnis zu überprüfen.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Schritt 7

Der folgende Befehl wird verwendet, um die Anwendung Eleunit_max auszuführen, indem die Eingabedateien aus dem Eingabeverzeichnis übernommen werden.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Warten Sie eine Weile, bis die Datei ausgeführt wird. Nach der Ausführung enthält die Ausgabe, wie unten gezeigt, die Anzahl der Eingabesplits, die Anzahl der Map-Aufgaben, die Anzahl der Reduzierungsaufgaben usw.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

Schritt 8

Der folgende Befehl wird verwendet, um die resultierenden Dateien im Ausgabeordner zu überprüfen.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Schritt 9

Der folgende Befehl wird verwendet, um die Ausgabe in anzuzeigen Part-00000 Datei. Diese Datei wird von HDFS generiert.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Unten sehen Sie die vom MapReduce-Programm generierte Ausgabe.

1981    34 
1984    40 
1985    45

Schritt 10

Der folgende Befehl wird verwendet, um den Ausgabeordner von HDFS zur Analyse in das lokale Dateisystem zu kopieren.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Wichtige Befehle

Alle Hadoop-Befehle werden von der aufgerufen $HADOOP_HOME/bin/hadoopBefehl. Wenn Sie das Hadoop-Skript ohne Argumente ausführen, wird die Beschreibung für alle Befehle gedruckt.

Usage - hadoop [--config confdir] BEFEHL

In der folgenden Tabelle sind die verfügbaren Optionen und ihre Beschreibung aufgeführt.

Sr.Nr. Option & Beschreibung
1

namenode -format

Formatiert das DFS-Dateisystem.

2

secondarynamenode

Führt den sekundären DFS-Namensknoten aus.

3

namenode

Führt den DFS-Namensknoten aus.

4

datanode

Führt einen DFS-Datenknoten aus.

5

dfsadmin

Führt einen DFS-Administratorclient aus.

6

mradmin

Führt einen Map-Reduce-Admin-Client aus.

7

fsck

Führt ein Dienstprogramm zur Überprüfung des DFS-Dateisystems aus.

8

fs

Führt einen generischen Dateisystem-Benutzerclient aus.

9

balancer

Führt ein Dienstprogramm zum Clusterausgleich aus.

10

oiv

Wendet den Offline-Fsimage-Viewer auf ein Fsimage an.

11

fetchdt

Ruft ein Delegierungstoken vom NameNode ab.

12

jobtracker

Führt den MapReduce-Job-Tracker-Knoten aus.

13

pipes

Führt einen Pipes-Job aus.

14

tasktracker

Führt einen MapReduce-Task-Tracker-Knoten aus.

15

historyserver

Führt Jobverlaufsserver als eigenständigen Daemon aus.

16

job

Manipuliert die MapReduce-Jobs.

17

queue

Ruft Informationen zu JobQueues ab.

18

version

Druckt die Version.

19

jar <jar>

Führt eine JAR-Datei aus.

20

distcp <srcurl> <desturl>

Kopiert Dateien oder Verzeichnisse rekursiv.

21

distcp2 <srcurl> <desturl>

DistCp Version 2.

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Erstellt ein Hadoop-Archiv.

23

classpath

Druckt den Klassenpfad, der zum Abrufen des Hadoop-Jars und der erforderlichen Bibliotheken erforderlich ist.

24

daemonlog

Abrufen / Festlegen der Protokollstufe für jeden Dämon

So interagieren Sie mit MapReduce-Jobs

Verwendung - Hadoop-Job [GENERIC_OPTIONS]

Im Folgenden sind die allgemeinen Optionen aufgeführt, die in einem Hadoop-Job verfügbar sind.

Sr.Nr. GENERIC_OPTION & Beschreibung
1

-submit <job-file>

Sendet den Job.

2

-status <job-id>

Druckt die Karte und reduziert den Fertigstellungsgrad sowie alle Auftragszähler.

3

-counter <job-id> <group-name> <countername>

Druckt den Zählerwert.

4

-kill <job-id>

Tötet den Job.

5

-events <job-id> <fromevent-#> <#-of-events>

Druckt die vom Jobtracker empfangenen Ereignisdetails für den angegebenen Bereich.

6

-history [all] <jobOutputDir> - history < jobOutputDir>

Druckt Auftragsdetails, fehlgeschlagene und getötete Tippdetails. Weitere Details zum Job, z. B. erfolgreiche Aufgaben und Aufgabenversuche für jede Aufgabe, können durch Angabe der Option [alle] angezeigt werden.

7

-list[all]

Zeigt alle Jobs an. -list zeigt nur Jobs an, die noch abgeschlossen werden müssen.

8

-kill-task <task-id>

Tötet die Aufgabe. Getötete Aufgaben werden NICHT auf fehlgeschlagene Versuche angerechnet.

9

-fail-task <task-id>

Schlägt die Aufgabe fehl. Fehlgeschlagene Aufgaben werden gegen fehlgeschlagene Versuche gezählt.

10

-set-priority <job-id> <priority>

Ändert die Priorität des Jobs. Zulässige Prioritätswerte sind VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

Den Status des Jobs anzeigen

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

Anzeigen des Verlaufs des Jobausgabeverzeichnisses

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

Den Job töten

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004