Hadoop - Streaming

Hadoop-Streaming ist ein Dienstprogramm, das mit der Hadoop-Distribution geliefert wird. Mit diesem Dienstprogramm können Sie Map / Reduce-Jobs mit einer beliebigen ausführbaren Datei oder einem Skript als Mapper und / oder Reduzierer erstellen und ausführen.

Beispiel mit Python

Beim Hadoop-Streaming betrachten wir das Problem der Wortanzahl. Jeder Job in Hadoop muss zwei Phasen haben: Mapper und Reducer. Wir haben Codes für den Mapper und den Reduzierer im Python-Skript geschrieben, um es unter Hadoop auszuführen. Man kann das auch in Perl und Ruby schreiben.

Mapper-Phasencode

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

Stellen Sie sicher, dass diese Datei über die Ausführungsberechtigung verfügt (chmod + x / home /experte / hadoop-1.2.1 / mapper.py).

Reduzierphasencode

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

Speichern Sie die Mapper- und Reducer-Codes in mapper.py und reducer.py im Hadoop-Home-Verzeichnis. Stellen Sie sicher, dass diese Dateien über Ausführungsberechtigungen verfügen (chmod + x mapper.py und chmod + x reducer.py). Da Python einrückungsempfindlich ist, kann derselbe Code über den folgenden Link heruntergeladen werden.

Ausführung des WordCount-Programms

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

Wobei "\" für die Zeilenfortsetzung zur besseren Lesbarkeit verwendet wird.

Zum Beispiel,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

Wie Streaming funktioniert

Im obigen Beispiel sind sowohl der Mapper als auch der Reduzierer Python-Skripte, die die Eingabe von der Standardeingabe lesen und die Ausgabe an die Standardausgabe ausgeben. Das Dienstprogramm erstellt einen Map / Reduce-Job, sendet den Job an einen geeigneten Cluster und überwacht den Fortschritt des Jobs, bis er abgeschlossen ist.

Wenn ein Skript für Mapper angegeben wird, startet jede Mapper-Task das Skript als separaten Prozess, wenn der Mapper initialisiert wird. Während die Mapper-Task ausgeführt wird, konvertiert sie ihre Eingaben in Zeilen und führt die Zeilen der Standardeingabe (STDIN) des Prozesses zu. In der Zwischenzeit sammelt der Mapper die zeilenorientierten Ausgaben von der Standardausgabe (STDOUT) des Prozesses und konvertiert jede Zeile in ein Schlüssel / Wert-Paar, das als Ausgabe des Mappers gesammelt wird. Standardmäßig ist das Präfix einer Zeile bis zum ersten Tabulatorzeichen der Schlüssel und der Rest der Zeile (ohne das Tabulatorzeichen) ist der Wert. Wenn die Zeile kein Tabulatorzeichen enthält, wird die gesamte Zeile als Schlüssel betrachtet und der Wert ist null. Dies kann jedoch nach Bedarf angepasst werden.

Wenn ein Skript für Reduzierer angegeben ist, startet jede Reduziereraufgabe das Skript als separaten Prozess, und der Reduzierer wird initialisiert. Während die Reduzierungsaufgabe ausgeführt wird, konvertiert sie ihre Eingabe-Schlüssel / Werte-Paare in Zeilen und führt die Zeilen der Standardeingabe (STDIN) des Prozesses zu. In der Zwischenzeit sammelt der Reduzierer die zeilenorientierten Ausgaben von der Standardausgabe (STDOUT) des Prozesses und wandelt jede Zeile in ein Schlüssel / Wert-Paar um, das als Ausgang des Reduzierers gesammelt wird. Standardmäßig ist das Präfix einer Zeile bis zum ersten Tabulatorzeichen der Schlüssel und der Rest der Zeile (ohne das Tabulatorzeichen) der Wert. Dies kann jedoch gemäß den spezifischen Anforderungen angepasst werden.

Wichtige Befehle

Parameter Optionen Beschreibung
-Eingabeverzeichnis / Dateiname Erforderlich Eingabeort für Mapper.
-ausgabeverzeichnisname Erforderlich Ausgangsort für Reduzierstück.
-mapper ausführbare Datei oder Skript oder JavaClassName Erforderlich Mapper ausführbare Datei.
-reducer ausführbare Datei oder Skript oder JavaClassName Erforderlich Reduzierbare ausführbare Datei.
-Datei Dateiname Optional Stellt die ausführbare Mapper-, Reducer- oder Combiner-Datei lokal auf den Rechenknoten zur Verfügung.
-inputformat JavaClassName Optional Die von Ihnen angegebene Klasse sollte Schlüssel / Wert-Paare der Textklasse zurückgeben. Wenn nicht angegeben, wird standardmäßig TextInputFormat verwendet.
-outputformat JavaClassName Optional Die von Ihnen angegebene Klasse sollte Schlüssel / Wert-Paare der Textklasse annehmen. Wenn nicht angegeben, wird standardmäßig TextOutputformat verwendet.
-Partitionierer JavaClassName Optional Klasse, die bestimmt, an welche Reduzierung ein Schlüssel gesendet wird.
-combiner StreamingCommand oder JavaClassName Optional Combiner ausführbar für die Kartenausgabe.
-cmdenv name = value Optional Übergibt die Umgebungsvariable an Streaming-Befehle.
-Inputreader Optional Aus Gründen der Abwärtskompatibilität: Gibt eine Datensatzleserklasse an (anstelle einer Eingabeformatklasse).
-verbose Optional Ausführliche Ausgabe.
-lazyOutput Optional Erzeugt träge Ausgabe. Wenn das Ausgabeformat beispielsweise auf FileOutputFormat basiert, wird die Ausgabedatei nur beim ersten Aufruf von output.collect (oder Context.write) erstellt.
-numReduceTasks Optional Gibt die Anzahl der Reduzierungen an.
-mapdebug Optional Skript, das aufgerufen werden soll, wenn die Kartenaufgabe fehlschlägt.
-reduzierter Fehler Optional Skript zum Aufrufen, wenn die Reduzierungsaufgabe fehlschlägt.