Hadoop - Streaming
Hadoop-Streaming ist ein Dienstprogramm, das mit der Hadoop-Distribution geliefert wird. Mit diesem Dienstprogramm können Sie Map / Reduce-Jobs mit einer beliebigen ausführbaren Datei oder einem Skript als Mapper und / oder Reduzierer erstellen und ausführen.
Beispiel mit Python
Beim Hadoop-Streaming betrachten wir das Problem der Wortanzahl. Jeder Job in Hadoop muss zwei Phasen haben: Mapper und Reducer. Wir haben Codes für den Mapper und den Reduzierer im Python-Skript geschrieben, um es unter Hadoop auszuführen. Man kann das auch in Perl und Ruby schreiben.
Mapper-Phasencode
!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
Stellen Sie sicher, dass diese Datei über die Ausführungsberechtigung verfügt (chmod + x / home /experte / hadoop-1.2.1 / mapper.py).
Reduzierphasencode
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('\t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Speichern Sie die Mapper- und Reducer-Codes in mapper.py und reducer.py im Hadoop-Home-Verzeichnis. Stellen Sie sicher, dass diese Dateien über Ausführungsberechtigungen verfügen (chmod + x mapper.py und chmod + x reducer.py). Da Python einrückungsempfindlich ist, kann derselbe Code über den folgenden Link heruntergeladen werden.
Ausführung des WordCount-Programms
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
-input input_dirs \
-output output_dir \
-mapper <path/mapper.py \
-reducer <path/reducer.py
Wobei "\" für die Zeilenfortsetzung zur besseren Lesbarkeit verwendet wird.
Zum Beispiel,
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
Wie Streaming funktioniert
Im obigen Beispiel sind sowohl der Mapper als auch der Reduzierer Python-Skripte, die die Eingabe von der Standardeingabe lesen und die Ausgabe an die Standardausgabe ausgeben. Das Dienstprogramm erstellt einen Map / Reduce-Job, sendet den Job an einen geeigneten Cluster und überwacht den Fortschritt des Jobs, bis er abgeschlossen ist.
Wenn ein Skript für Mapper angegeben wird, startet jede Mapper-Task das Skript als separaten Prozess, wenn der Mapper initialisiert wird. Während die Mapper-Task ausgeführt wird, konvertiert sie ihre Eingaben in Zeilen und führt die Zeilen der Standardeingabe (STDIN) des Prozesses zu. In der Zwischenzeit sammelt der Mapper die zeilenorientierten Ausgaben von der Standardausgabe (STDOUT) des Prozesses und konvertiert jede Zeile in ein Schlüssel / Wert-Paar, das als Ausgabe des Mappers gesammelt wird. Standardmäßig ist das Präfix einer Zeile bis zum ersten Tabulatorzeichen der Schlüssel und der Rest der Zeile (ohne das Tabulatorzeichen) ist der Wert. Wenn die Zeile kein Tabulatorzeichen enthält, wird die gesamte Zeile als Schlüssel betrachtet und der Wert ist null. Dies kann jedoch nach Bedarf angepasst werden.
Wenn ein Skript für Reduzierer angegeben ist, startet jede Reduziereraufgabe das Skript als separaten Prozess, und der Reduzierer wird initialisiert. Während die Reduzierungsaufgabe ausgeführt wird, konvertiert sie ihre Eingabe-Schlüssel / Werte-Paare in Zeilen und führt die Zeilen der Standardeingabe (STDIN) des Prozesses zu. In der Zwischenzeit sammelt der Reduzierer die zeilenorientierten Ausgaben von der Standardausgabe (STDOUT) des Prozesses und wandelt jede Zeile in ein Schlüssel / Wert-Paar um, das als Ausgang des Reduzierers gesammelt wird. Standardmäßig ist das Präfix einer Zeile bis zum ersten Tabulatorzeichen der Schlüssel und der Rest der Zeile (ohne das Tabulatorzeichen) der Wert. Dies kann jedoch gemäß den spezifischen Anforderungen angepasst werden.
Wichtige Befehle
Parameter | Optionen | Beschreibung |
---|---|---|
-Eingabeverzeichnis / Dateiname | Erforderlich | Eingabeort für Mapper. |
-ausgabeverzeichnisname | Erforderlich | Ausgangsort für Reduzierstück. |
-mapper ausführbare Datei oder Skript oder JavaClassName | Erforderlich | Mapper ausführbare Datei. |
-reducer ausführbare Datei oder Skript oder JavaClassName | Erforderlich | Reduzierbare ausführbare Datei. |
-Datei Dateiname | Optional | Stellt die ausführbare Mapper-, Reducer- oder Combiner-Datei lokal auf den Rechenknoten zur Verfügung. |
-inputformat JavaClassName | Optional | Die von Ihnen angegebene Klasse sollte Schlüssel / Wert-Paare der Textklasse zurückgeben. Wenn nicht angegeben, wird standardmäßig TextInputFormat verwendet. |
-outputformat JavaClassName | Optional | Die von Ihnen angegebene Klasse sollte Schlüssel / Wert-Paare der Textklasse annehmen. Wenn nicht angegeben, wird standardmäßig TextOutputformat verwendet. |
-Partitionierer JavaClassName | Optional | Klasse, die bestimmt, an welche Reduzierung ein Schlüssel gesendet wird. |
-combiner StreamingCommand oder JavaClassName | Optional | Combiner ausführbar für die Kartenausgabe. |
-cmdenv name = value | Optional | Übergibt die Umgebungsvariable an Streaming-Befehle. |
-Inputreader | Optional | Aus Gründen der Abwärtskompatibilität: Gibt eine Datensatzleserklasse an (anstelle einer Eingabeformatklasse). |
-verbose | Optional | Ausführliche Ausgabe. |
-lazyOutput | Optional | Erzeugt träge Ausgabe. Wenn das Ausgabeformat beispielsweise auf FileOutputFormat basiert, wird die Ausgabedatei nur beim ersten Aufruf von output.collect (oder Context.write) erstellt. |
-numReduceTasks | Optional | Gibt die Anzahl der Reduzierungen an. |
-mapdebug | Optional | Skript, das aufgerufen werden soll, wenn die Kartenaufgabe fehlschlägt. |
-reduzierter Fehler | Optional | Skript zum Aufrufen, wenn die Reduzierungsaufgabe fehlschlägt. |