Hadoop - MapReduce
MapReduce ist ein Framework, mit dem wir Anwendungen schreiben können, um große Datenmengen parallel auf großen Clustern von Standardhardware auf zuverlässige Weise zu verarbeiten.
Was ist MapReduce?
MapReduce ist eine Verarbeitungstechnik und ein Programmmodell für verteiltes Computing auf Basis von Java. Der MapReduce-Algorithmus enthält zwei wichtige Aufgaben, nämlich Map und Reduce. Map nimmt einen Datensatz und konvertiert ihn in einen anderen Datensatz, in dem einzelne Elemente in Tupel (Schlüssel / Wert-Paare) zerlegt werden. Zweitens reduzieren Sie die Aufgabe, bei der die Ausgabe einer Karte als Eingabe verwendet und diese Datentupel zu einem kleineren Satz von Tupeln kombiniert werden. Wie die Reihenfolge des Namens MapReduce impliziert, wird die Reduzierungsaufgabe immer nach dem Kartenjob ausgeführt.
Der Hauptvorteil von MapReduce besteht darin, dass die Datenverarbeitung einfach über mehrere Rechenknoten skaliert werden kann. Im MapReduce-Modell werden die Datenverarbeitungsprimitive als Mapper und Reducer bezeichnet. Das Zerlegen einer Datenverarbeitungsanwendung in Mapper und Reduzierer ist manchmal nicht trivial. Sobald wir jedoch eine Anwendung im MapReduce-Formular schreiben, ist die Skalierung der Anwendung auf Hunderte, Tausende oder sogar Zehntausende von Computern in einem Cluster lediglich eine Konfigurationsänderung. Diese einfache Skalierbarkeit hat viele Programmierer dazu bewegt, das MapReduce-Modell zu verwenden.
Der Algorithmus
Im Allgemeinen basiert das MapReduce-Paradigma darauf, den Computer an den Ort zu senden, an dem sich die Daten befinden!
Das MapReduce-Programm wird in drei Phasen ausgeführt, nämlich Map-Phase, Shuffle-Phase und Reduktionsphase.
Map stage- Die Aufgabe der Karte oder des Mappers besteht darin, die Eingabedaten zu verarbeiten. Im Allgemeinen liegen die Eingabedaten in Form einer Datei oder eines Verzeichnisses vor und werden im Hadoop-Dateisystem (HDFS) gespeichert. Die Eingabedatei wird zeilenweise an die Mapper-Funktion übergeben. Der Mapper verarbeitet die Daten und erstellt mehrere kleine Datenblöcke.
Reduce stage - Diese Phase ist die Kombination der Shuffle Bühne und die ReduceBühne. Die Aufgabe des Reduzierers besteht darin, die Daten zu verarbeiten, die vom Mapper stammen. Nach der Verarbeitung wird ein neuer Ausgabesatz erstellt, der im HDFS gespeichert wird.
Während eines MapReduce-Jobs sendet Hadoop die Map- und Reduce-Aufgaben an die entsprechenden Server im Cluster.
Das Framework verwaltet alle Details der Datenübergabe, z. B. das Ausgeben von Aufgaben, das Überprüfen der Aufgabenerfüllung und das Kopieren von Daten im Cluster zwischen den Knoten.
Der größte Teil der Datenverarbeitung findet auf Knoten mit Daten auf lokalen Festplatten statt, wodurch der Netzwerkverkehr reduziert wird.
Nach Abschluss der angegebenen Aufgaben sammelt und reduziert der Cluster die Daten, um ein geeignetes Ergebnis zu erhalten, und sendet sie an den Hadoop-Server zurück.
Ein- und Ausgänge (Java-Perspektive)
Das MapReduce-Framework verarbeitet <Schlüssel, Wert> -Paare, dh das Framework betrachtet die Eingabe in den Job als Satz von <Schlüssel, Wert> -Paaren und erzeugt einen Satz von <Schlüssel, Wert> -Paaren als Ausgabe des Jobs denkbar von verschiedenen Arten.
Der Schlüssel und die Werteklassen sollten vom Framework serialisiert werden und müssen daher die beschreibbare Schnittstelle implementieren. Darüber hinaus müssen die Schlüsselklassen die Schnittstelle Writable-Comparable implementieren, um das Sortieren nach Framework zu erleichtern. Eingabe- und Ausgabetypen von aMapReduce job - (Eingabe) <k1, v1> → Karte → <k2, v2> → reduzieren → <k3, v3> (Ausgabe).
Eingang | Ausgabe | |
---|---|---|
Karte | <k1, v1> | Liste (<k2, v2>) |
Reduzieren | <k2, Liste (v2)> | Liste (<k3, v3>) |
Terminologie
PayLoad - Anwendungen implementieren die Funktionen Map und Reduce und bilden den Kern des Jobs.
Mapper - Mapper ordnet die Eingabeschlüssel / Wert-Paare einem Satz von Zwischenschlüssel / Wert-Paaren zu.
NamedNode - Knoten, der das Hadoop Distributed File System (HDFS) verwaltet.
DataNode - Knoten, auf dem die Daten vor der Verarbeitung im Voraus präsentiert werden.
MasterNode - Knoten, auf dem JobTracker ausgeführt wird und der Jobanforderungen von Clients akzeptiert.
SlaveNode - Knoten, auf dem das Map and Reduce-Programm ausgeführt wird.
JobTracker - Plant Jobs und verfolgt die dem Task-Tracker zugewiesenen Jobs.
Task Tracker - Verfolgt die Aufgabe und meldet den Status an JobTracker.
Job - Ein Programm ist eine Ausführung eines Mapper und Reducer in einem Datensatz.
Task - Eine Ausführung eines Mappers oder eines Reduzierers auf einem Datenabschnitt.
Task Attempt - Eine bestimmte Instanz eines Versuchs, eine Aufgabe auf einem SlaveNode auszuführen.
Beispielszenario
Nachstehend sind die Daten zum Stromverbrauch einer Organisation aufgeführt. Es enthält den monatlichen Stromverbrauch und den Jahresdurchschnitt für verschiedene Jahre.
Jan. | Feb. | Beschädigen | Apr. | Kann | Jun | Jul | Aug. | Sep. | Okt. | Nov. | Dez. | Durchschn | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Wenn die oben genannten Daten als Eingabe angegeben werden, müssen wir Anwendungen schreiben, um sie zu verarbeiten und Ergebnisse zu erzielen, z. B. das Jahr der maximalen Nutzung, das Jahr der minimalen Nutzung usw. Dies ist ein Walkover für Programmierer mit einer begrenzten Anzahl von Datensätzen. Sie schreiben einfach die Logik, um die erforderliche Ausgabe zu erzeugen, und übergeben die Daten an die geschriebene Anwendung.
Denken Sie jedoch an die Daten, die den Stromverbrauch aller großen Industrien eines bestimmten Staates seit seiner Gründung darstellen.
Wenn wir Anwendungen schreiben, um solche Massendaten zu verarbeiten,
Die Ausführung wird viel Zeit in Anspruch nehmen.
Es wird einen starken Netzwerkverkehr geben, wenn wir Daten von der Quelle auf den Netzwerkserver usw. verschieben.
Um diese Probleme zu lösen, haben wir das MapReduce-Framework.
Eingabedaten
Die obigen Daten werden als gespeichert sample.txtund als Eingabe gegeben. Die Eingabedatei sieht wie folgt aus.
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Beispielprogramm
Im Folgenden wird das Programm für die Beispieldaten mit dem MapReduce-Framework angegeben.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Speichern Sie das obige Programm als ProcessUnits.java. Die Kompilierung und Ausführung des Programms wird nachfolgend erläutert.
Zusammenstellung und Ausführung des Programms für Prozesseinheiten
Nehmen wir an, wir befinden uns im Home-Verzeichnis eines Hadoop-Benutzers (z. B. / home / hadoop).
Führen Sie die folgenden Schritte aus, um das obige Programm zu kompilieren und auszuführen.
Schritt 1
Mit dem folgenden Befehl erstellen Sie ein Verzeichnis zum Speichern der kompilierten Java-Klassen.
$ mkdir units
Schritt 2
Herunterladen Hadoop-core-1.2.1.jar,Hiermit wird das MapReduce-Programm kompiliert und ausgeführt. Besuchen Sie den folgenden Link mvnrepository.com , um das Glas herunterzuladen. Nehmen wir an, der heruntergeladene Ordner ist/home/hadoop/.
Schritt 3
Die folgenden Befehle werden zum Kompilieren von verwendet ProcessUnits.java Programm und Erstellen eines Glases für das Programm.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Schritt 4
Der folgende Befehl wird verwendet, um ein Eingabeverzeichnis in HDFS zu erstellen.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Schritt 5
Der folgende Befehl wird verwendet, um die Eingabedatei mit dem Namen zu kopieren sample.txtim Eingabeverzeichnis von HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Schritt 6
Der folgende Befehl wird verwendet, um die Dateien im Eingabeverzeichnis zu überprüfen.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Schritt 7
Der folgende Befehl wird verwendet, um die Anwendung Eleunit_max auszuführen, indem die Eingabedateien aus dem Eingabeverzeichnis übernommen werden.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Warten Sie eine Weile, bis die Datei ausgeführt wird. Nach der Ausführung enthält die Ausgabe, wie unten gezeigt, die Anzahl der Eingabesplits, die Anzahl der Map-Aufgaben, die Anzahl der Reduzierungsaufgaben usw.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
Schritt 8
Der folgende Befehl wird verwendet, um die resultierenden Dateien im Ausgabeordner zu überprüfen.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Schritt 9
Der folgende Befehl wird verwendet, um die Ausgabe in anzuzeigen Part-00000 Datei. Diese Datei wird von HDFS generiert.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Unten sehen Sie die vom MapReduce-Programm generierte Ausgabe.
1981 34
1984 40
1985 45
Schritt 10
Der folgende Befehl wird verwendet, um den Ausgabeordner von HDFS zur Analyse in das lokale Dateisystem zu kopieren.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
Wichtige Befehle
Alle Hadoop-Befehle werden von der aufgerufen $HADOOP_HOME/bin/hadoopBefehl. Wenn Sie das Hadoop-Skript ohne Argumente ausführen, wird die Beschreibung für alle Befehle gedruckt.
Usage - hadoop [--config confdir] BEFEHL
In der folgenden Tabelle sind die verfügbaren Optionen und ihre Beschreibung aufgeführt.
Sr.Nr. | Option & Beschreibung |
---|---|
1 | namenode -format Formatiert das DFS-Dateisystem. |
2 | secondarynamenode Führt den sekundären DFS-Namensknoten aus. |
3 | namenode Führt den DFS-Namensknoten aus. |
4 | datanode Führt einen DFS-Datenknoten aus. |
5 | dfsadmin Führt einen DFS-Administratorclient aus. |
6 | mradmin Führt einen Map-Reduce-Admin-Client aus. |
7 | fsck Führt ein Dienstprogramm zur Überprüfung des DFS-Dateisystems aus. |
8 | fs Führt einen generischen Dateisystem-Benutzerclient aus. |
9 | balancer Führt ein Dienstprogramm zum Clusterausgleich aus. |
10 | oiv Wendet den Offline-Fsimage-Viewer auf ein Fsimage an. |
11 | fetchdt Ruft ein Delegierungstoken vom NameNode ab. |
12 | jobtracker Führt den MapReduce-Job-Tracker-Knoten aus. |
13 | pipes Führt einen Pipes-Job aus. |
14 | tasktracker Führt einen MapReduce-Task-Tracker-Knoten aus. |
15 | historyserver Führt Jobverlaufsserver als eigenständigen Daemon aus. |
16 | job Manipuliert die MapReduce-Jobs. |
17 | queue Ruft Informationen zu JobQueues ab. |
18 | version Druckt die Version. |
19 | jar <jar> Führt eine JAR-Datei aus. |
20 | distcp <srcurl> <desturl> Kopiert Dateien oder Verzeichnisse rekursiv. |
21 | distcp2 <srcurl> <desturl> DistCp Version 2. |
22 | archive -archiveName NAME -p <parent path> <src>* <dest> Erstellt ein Hadoop-Archiv. |
23 | classpath Druckt den Klassenpfad, der zum Abrufen des Hadoop-Jars und der erforderlichen Bibliotheken erforderlich ist. |
24 | daemonlog Abrufen / Festlegen der Protokollstufe für jeden Dämon |
So interagieren Sie mit MapReduce-Jobs
Verwendung - Hadoop-Job [GENERIC_OPTIONS]
Im Folgenden sind die allgemeinen Optionen aufgeführt, die in einem Hadoop-Job verfügbar sind.
Sr.Nr. | GENERIC_OPTION & Beschreibung |
---|---|
1 | -submit <job-file> Sendet den Job. |
2 | -status <job-id> Druckt die Karte und reduziert den Fertigstellungsgrad sowie alle Auftragszähler. |
3 | -counter <job-id> <group-name> <countername> Druckt den Zählerwert. |
4 | -kill <job-id> Tötet den Job. |
5 | -events <job-id> <fromevent-#> <#-of-events> Druckt die vom Jobtracker empfangenen Ereignisdetails für den angegebenen Bereich. |
6 | -history [all] <jobOutputDir> - history < jobOutputDir> Druckt Auftragsdetails, fehlgeschlagene und getötete Tippdetails. Weitere Details zum Job, z. B. erfolgreiche Aufgaben und Aufgabenversuche für jede Aufgabe, können durch Angabe der Option [alle] angezeigt werden. |
7 | -list[all] Zeigt alle Jobs an. -list zeigt nur Jobs an, die noch abgeschlossen werden müssen. |
8 | -kill-task <task-id> Tötet die Aufgabe. Getötete Aufgaben werden NICHT auf fehlgeschlagene Versuche angerechnet. |
9 | -fail-task <task-id> Schlägt die Aufgabe fehl. Fehlgeschlagene Aufgaben werden gegen fehlgeschlagene Versuche gezählt. |
10 | -set-priority <job-id> <priority> Ändert die Priorität des Jobs. Zulässige Prioritätswerte sind VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
Den Status des Jobs anzeigen
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
Anzeigen des Verlaufs des Jobausgabeverzeichnisses
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
Den Job töten
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004