MapReduce - szybki przewodnik

MapReduce to model programowania do pisania aplikacji, które mogą przetwarzać duże zbiory danych równolegle na wielu węzłach. MapReduce zapewnia możliwości analityczne do analizowania ogromnych ilości złożonych danych.

Co to jest Big Data?

Big Data to zbiór dużych zbiorów danych, których nie można przetwarzać przy użyciu tradycyjnych technik obliczeniowych. Na przykład ilość danych potrzebnych Facebookowi lub YouTube do codziennego zbierania i zarządzania może należeć do kategorii Big Data. Jednak Big Data to nie tylko skala i wielkość, ale obejmuje również jeden lub więcej z następujących aspektów - prędkość, różnorodność, objętość i złożoność.

Dlaczego MapReduce?

Tradycyjne systemy korporacyjne mają zwykle scentralizowany serwer do przechowywania i przetwarzania danych. Poniższa ilustracja przedstawia schematyczny widok tradycyjnego systemu przedsiębiorstwa. Tradycyjny model z pewnością nie nadaje się do przetwarzania ogromnych ilości skalowalnych danych i nie może być dostosowany do standardowych serwerów baz danych. Co więcej, scentralizowany system tworzy zbyt duże wąskie gardło podczas jednoczesnego przetwarzania wielu plików.

Google rozwiązało ten problem z wąskim gardłem za pomocą algorytmu o nazwie MapReduce. MapReduce dzieli zadanie na małe części i przypisuje je do wielu komputerów. Później wyniki są gromadzone w jednym miejscu i integrowane w celu utworzenia zestawu danych wyników.

Jak działa MapReduce?

Algorytm MapReduce zawiera dwa ważne zadania, a mianowicie Mapowanie i Zmniejszanie.

  • Zadanie mapy pobiera zestaw danych i konwertuje go na inny zestaw danych, w którym poszczególne elementy są dzielone na krotki (pary klucz-wartość).

  • Zadanie Reduce pobiera dane wyjściowe z mapy jako dane wejściowe i łączy te krotki danych (pary klucz-wartość) w mniejszy zestaw krotek.

Zadanie redukcji jest zawsze wykonywane po utworzeniu mapy.

Przyjrzyjmy się teraz bliżej każdej z faz i spróbujmy zrozumieć ich znaczenie.

  • Input Phase - Tutaj mamy czytnik rekordów, który tłumaczy każdy rekord w pliku wejściowym i wysyła przeanalizowane dane do programu odwzorowującego w postaci par klucz-wartość.

  • Map - Map to funkcja zdefiniowana przez użytkownika, która pobiera serię par klucz-wartość i przetwarza każdą z nich, aby wygenerować zero lub więcej par klucz-wartość.

  • Intermediate Keys - Pary klucz-wartość generowane przez program odwzorowujący nazywane są kluczami pośrednimi.

  • Combiner- Sumator jest rodzajem lokalnego reduktora, który grupuje podobne dane z fazy mapy w możliwe do zidentyfikowania zbiory. Pobiera klucze pośrednie z programu odwzorowującego jako dane wejściowe i stosuje kod zdefiniowany przez użytkownika w celu agregacji wartości w małym zakresie jednego programu odwzorowującego. Nie jest częścią głównego algorytmu MapReduce; jest to opcjonalne.

  • Shuffle and Sort- Zadanie Reducer rozpoczyna się od kroku Shuffle and Sort. Pobiera zgrupowane pary klucz-wartość na komputer lokalny, na którym działa reduktor. Poszczególne pary klucz-wartość są sortowane według klucza w większą listę danych. Lista danych grupuje równoważne klucze razem, dzięki czemu ich wartości mogą być łatwo iterowane w zadaniu reduktora.

  • Reducer- Reducer pobiera pogrupowane sparowane dane klucz-wartość jako dane wejściowe i uruchamia funkcję Reduktora na każdym z nich. Tutaj dane mogą być agregowane, filtrowane i łączone na wiele sposobów, a to wymaga szerokiego zakresu przetwarzania. Po zakończeniu wykonywania daje zero lub więcej par klucz-wartość do ostatniego kroku.

  • Output Phase - W fazie wyjściowej mamy program formatujący dane wyjściowe, który tłumaczy końcowe pary klucz-wartość z funkcji Reducer i zapisuje je do pliku za pomocą programu do zapisywania rekordów.

Spróbujmy zrozumieć dwa zadania Map & f Reduce za pomocą małego diagramu -

MapReduce-Example

Weźmy przykład ze świata rzeczywistego, aby zrozumieć moc MapReduce. Twitter otrzymuje około 500 milionów tweetów dziennie, co daje prawie 3000 tweetów na sekundę. Poniższa ilustracja pokazuje, jak głośnik wysokotonowy zarządza swoimi tweetami za pomocą MapReduce.

Jak pokazano na ilustracji, algorytm MapReduce wykonuje następujące czynności -

  • Tokenize - Tokenizuje tweety na mapy tokenów i zapisuje je jako pary klucz-wartość.

  • Filter - Filtruje niechciane słowa z map tokenów i zapisuje przefiltrowane mapy jako pary klucz-wartość.

  • Count - Generuje licznik żetonów na słowo.

  • Aggregate Counters - Przygotowuje agregat podobnych wartości liczników w małe możliwe do zarządzania jednostki.

Algorytm MapReduce zawiera dwa ważne zadania, a mianowicie Mapowanie i Zmniejszanie.

  • Zadanie mapy jest realizowane za pomocą klasy Mapper
  • Zadanie redukcji jest realizowane za pomocą klasy reduktora.

Klasa Mapper pobiera dane wejściowe, tokenizuje je, mapuje i sortuje. Dane wyjściowe klasy Mapper są używane jako dane wejściowe przez klasę Reducer, która z kolei wyszukuje pasujące pary i redukuje je.

MapReduce implementuje różne algorytmy matematyczne, aby podzielić zadanie na małe części i przypisać je do wielu systemów. Pod względem technicznym algorytm MapReduce pomaga w wysyłaniu zadań Map & Reduce do odpowiednich serwerów w klastrze.

Te algorytmy matematyczne mogą obejmować:

  • Sorting
  • Searching
  • Indexing
  • TF-IDF

Sortowanie

Sortowanie jest jednym z podstawowych algorytmów MapReduce do przetwarzania i analizy danych. MapReduce implementuje algorytm sortowania, aby automatycznie sortować wyjściowe pary klucz-wartość z programu odwzorowującego według ich kluczy.

  • Metody sortowania są zaimplementowane w samej klasie mappera.

  • W fazie Shuffle and Sort, po tokenizowaniu wartości w klasie mappera, plik Context class (klasa zdefiniowana przez użytkownika) zbiera pasujące klucze o wartościach jako kolekcję.

  • Aby zebrać podobne pary klucz-wartość (klucze pośrednie), klasa Mapper korzysta z pomocy RawComparator klasy, aby posortować pary klucz-wartość.

  • Zestaw pośrednich par klucz-wartość dla danego Reducera jest automatycznie sortowany przez Hadoop w celu utworzenia par klucz-wartość (K2, {V2, V2,…}), zanim zostaną przedstawione Reducerowi.

Badawczy

Wyszukiwanie odgrywa ważną rolę w algorytmie MapReduce. Pomaga w fazie sumatora (opcjonalnie) oraz w fazie reduktora. Spróbujmy zrozumieć, jak działa wyszukiwanie, na przykładzie.

Przykład

Poniższy przykład pokazuje, jak MapReduce wykorzystuje algorytm wyszukiwania, aby poznać szczegóły pracownika, który pobiera najwyższe wynagrodzenie w danym zbiorze danych pracowników.

  • Załóżmy, że mamy dane pracowników w czterech różnych plikach - A, B, C i D. Załóżmy również, że we wszystkich czterech plikach znajdują się zduplikowane rekordy pracowników z powodu wielokrotnego importowania danych pracowników ze wszystkich tabel bazy danych. Zobacz poniższą ilustrację.

  • The Map phaseprzetwarza każdy plik wejściowy i dostarcza dane pracowników w parach klucz-wartość (<k, v>: <imię i nazwisko, wynagrodzenie>). Zobacz poniższą ilustrację.

  • The combiner phase(technika wyszukiwania) zaakceptuje dane wejściowe z fazy mapy jako parę klucz-wartość z nazwiskiem pracownika i jego wynagrodzeniem. Korzystając z techniki wyszukiwania, sumator sprawdzi wszystkie pensje pracowników, aby znaleźć pracownika o najwyższej pensji w każdym pliku. Zobacz poniższy fragment.

<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary

if(v(second employee).salary > Max){
   Max = v(salary);
}

else{
   Continue checking;
}

Oczekiwany wynik jest następujący -

<satish, 26000>

<gopal, 50000>

<kiran, 45000>

<manisha, 45000>

  • Reducer phase- W każdym pliku znajdziesz najwyżej opłacanego pracownika. Aby uniknąć nadmiarowości, sprawdź wszystkie pary <k, v> i wyeliminuj zduplikowane wpisy, jeśli występują. Ten sam algorytm jest używany między czterema parami <k, v>, które pochodzą z czterech plików wejściowych. Ostateczny wynik powinien wyglądać następująco -

<gopal, 50000>

Indeksowanie

Zwykle indeksowanie służy do wskazywania konkretnych danych i ich adresu. Wykonuje indeksowanie wsadowe plików wejściowych dla określonego programu Mapper.

Technika indeksowania, która jest zwykle używana w MapReduce, jest znana jako inverted index.Wyszukiwarki takie jak Google i Bing używają techniki indeksowania odwróconego. Spróbujmy zrozumieć, jak działa indeksowanie, na prostym przykładzie.

Przykład

Poniższy tekst stanowi dane wejściowe dla odwróconego indeksowania. Tutaj T [0], T [1] it [2] to nazwy plików, a ich zawartość jest w cudzysłowie.

T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"

Po zastosowaniu algorytmu indeksowania otrzymujemy następujący wynik -

"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}

Tutaj „a”: {2} oznacza, że ​​termin „a” pojawia się w pliku T [2]. Podobnie „jest”: {0, 1, 2} oznacza, że ​​termin „jest” pojawia się w plikach T [0], T [1] i T [2].

TF-IDF

TF-IDF to algorytm przetwarzania tekstu, który jest skrótem od Term Frequency - Inverse Document Frequency. Jest to jeden z powszechnych algorytmów analizy sieci. Tutaj termin „częstotliwość” odnosi się do tego, ile razy termin pojawia się w dokumencie.

Termin Częstotliwość (TF)

Mierzy, jak często dany termin występuje w dokumencie. Oblicza się, ile razy słowo pojawia się w dokumencie, podzielone przez całkowitą liczbę słów w tym dokumencie.

TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)

Odwrotna częstotliwość dokumentu (IDF)

Mierzy znaczenie terminu. Jest obliczany przez liczbę dokumentów w tekstowej bazie danych podzieloną przez liczbę dokumentów, w których występuje określony termin.

Podczas obliczania TF wszystkie terminy są uważane za równie ważne. Oznacza to, że TF liczy częstotliwość terminów dla normalnych słów, takich jak „jest”, „a”, „co” itp. Dlatego musimy znać częste terminy podczas skalowania w górę rzadkich, obliczając następujące -

IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).

Algorytm jest wyjaśniony poniżej za pomocą małego przykładu.

Przykład

Rozważmy dokument zawierający 1000 słów, w tym słowo hivepojawia się 50 razy. TF dlahive jest wtedy (50/1000) = 0,05.

Teraz załóżmy, że mamy 10 milionów dokumentów i słowo hivepojawia się w 1000 z nich. Następnie IDF jest obliczany jako log (10 000 000/1 000) = 4.

Iloczynem tych wielkości jest waga TF-IDF - 0,05 × 4 = 0,20.

MapReduce działa tylko w systemach operacyjnych o smaku Linux i ma wbudowaną strukturę Hadoop. Aby zainstalować framework Hadoop, musimy wykonać następujące kroki.

Weryfikacja instalacji JAVA

Javę należy zainstalować w systemie przed zainstalowaniem Hadoop. Użyj następującego polecenia, aby sprawdzić, czy w systemie jest zainstalowana Java.

$ java –version

Jeśli Java jest już zainstalowana w twoim systemie, zobaczysz następującą odpowiedź -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Jeśli nie masz zainstalowanej Javy w swoim systemie, wykonaj kroki podane poniżej.

Instalowanie Java

Krok 1

Pobierz najnowszą wersję Java z poniższego linku - ten link .

Po pobraniu możesz zlokalizować plik jdk-7u71-linux-x64.tar.gz w folderze Pobrane.

Krok 2

Użyj następujących poleceń, aby wyodrębnić zawartość jdk-7u71-linux-x64.gz.

$ cd Downloads/
$ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

Krok 3

Aby udostępnić Javę wszystkim użytkownikom, musisz przenieść ją do lokalizacji „/ usr / local /”. Przejdź do roota i wpisz następujące polecenia -

$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit

Krok 4

Aby ustawić zmienne PATH i JAVA_HOME, dodaj następujące polecenia do pliku ~ / .bashrc.

export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin

Zastosuj wszystkie zmiany do aktualnie działającego systemu.

$ source ~/.bashrc

Krok 5

Użyj następujących poleceń, aby skonfigurować alternatywy Java -

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2

# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2

# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java

# alternatives --set javac usr/local/java/bin/javac

# alternatives --set jar usr/local/java/bin/jar

Teraz sprawdź instalację za pomocą polecenia java -version z terminala.

Weryfikacja instalacji Hadoop

Hadoop musi być zainstalowany w systemie przed zainstalowaniem MapReduce. Zweryfikujmy instalację Hadoop za pomocą następującego polecenia -

$ hadoop version

Jeśli Hadoop jest już zainstalowany w twoim systemie, otrzymasz następującą odpowiedź -

Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

Jeśli usługa Hadoop nie jest zainstalowana w systemie, wykonaj następujące czynności.

Pobieranie Hadoop

Pobierz Hadoop 2.4.1 z Apache Software Foundation i wyodrębnij jego zawartość za pomocą następujących poleceń.

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

Instalowanie Hadoop w trybie pseudo-rozproszonym

Poniższe kroki służą do instalowania Hadoop 2.4.1 w trybie pseudo rozproszonym.

Krok 1 - Konfiguracja Hadoop

Możesz ustawić zmienne środowiskowe Hadoop, dołączając następujące polecenia do pliku ~ / .bashrc.

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

Zastosuj wszystkie zmiany do aktualnie działającego systemu.

$ source ~/.bashrc

Krok 2 - Konfiguracja Hadoop

Wszystkie pliki konfiguracyjne Hadoop można znaleźć w lokalizacji „$ HADOOP_HOME / etc / hadoop”. Musisz wprowadzić odpowiednie zmiany w tych plikach konfiguracyjnych zgodnie z infrastrukturą Hadoop.

$ cd $HADOOP_HOME/etc/hadoop

Aby tworzyć programy Hadoop przy użyciu języka Java, musisz zresetować zmienne środowiskowe Java w hadoop-env.sh plik, zastępując wartość JAVA_HOME lokalizacją Java w systemie.

export JAVA_HOME=/usr/local/java

Aby skonfigurować Hadoop, musisz edytować następujące pliki -

  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml
  • mapred-site.xml

core-site.xml

core-site.xml zawiera następujące informacje -

  • Numer portu używany dla wystąpienia Hadoop
  • Pamięć przydzielona dla systemu plików
  • Limit pamięci do przechowywania danych
  • Rozmiar buforów do odczytu / zapisu

Otwórz plik core-site.xml i dodaj następujące właściwości między tagami <configuration> i </configuration>.

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000 </value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xml zawiera następujące informacje -

  • Wartość danych replikacji
  • Ścieżka do namenode
  • Ścieżka danych w lokalnych systemach plików (miejsce, w którym chcesz przechowywać infra Hadoop)

Załóżmy następujące dane.

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

Otwórz ten plik i dodaj następujące właściwości między tagami <configuration>, </configuration>.

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>
   
   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>
   
</configuration>

Note - W powyższym pliku wszystkie wartości właściwości są zdefiniowane przez użytkownika i można wprowadzać zmiany zgodnie z infrastrukturą Hadoop.

yarn-site.xml

Ten plik służy do konfigurowania przędzy w Hadoop. Otwórz plik yarn-site.xml i dodaj następujące właściwości między tagami <configuration>, </configuration>.

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

Ten plik służy do określenia używanej przez nas struktury MapReduce. Domyślnie Hadoop zawiera szablon yarn-site.xml. Przede wszystkim musisz skopiować plik z mapred-site.xml.template do pliku mapred-site.xml za pomocą następującego polecenia.

$ cp mapred-site.xml.template mapred-site.xml

Otwórz plik mapred-site.xml i dodaj następujące właściwości między tagami <configuration>, </configuration>.

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Weryfikacja instalacji Hadoop

Poniższe kroki służą do weryfikacji instalacji Hadoop.

Krok 1 - Konfiguracja nazwy węzła

Skonfiguruj namenode za pomocą polecenia „hdfs namenode -format” w następujący sposób -

$ cd ~ $ hdfs namenode -format

Oczekiwany wynik jest następujący -

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

Krok 2 - Weryfikacja plików dfs na platformie Hadoop

Wykonaj następujące polecenie, aby uruchomić system plików Hadoop.

$ start-dfs.sh

Oczekiwany wynik jest następujący -

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

Krok 3 - weryfikacja skryptu przędzy

Następujące polecenie służy do uruchamiania skryptu przędzy. Wykonanie tego polecenia spowoduje uruchomienie demonów przędzy.

$ start-yarn.sh

Oczekiwany wynik jest następujący -

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

Krok 4 - Dostęp do Hadoop w przeglądarce

Domyślny numer portu dostępu do Hadoop to 50070. Użyj następującego adresu URL, aby pobrać usługi Hadoop w przeglądarce.

http://localhost:50070/

Poniższy zrzut ekranu przedstawia przeglądarkę Hadoop.

Krok 5 - Zweryfikuj wszystkie aplikacje klastra

Domyślny numer portu dostępu do wszystkich aplikacji klastra to 8088. Aby skorzystać z tej usługi, użyj następującego adresu URL.

http://localhost:8088/

Poniższy zrzut ekranu przedstawia przeglądarkę klastra Hadoop.

W tym rozdziale przyjrzymy się bliżej klasom i ich metodom, które są zaangażowane w operacje programowania MapReduce. Będziemy przede wszystkim skupiać się na następujących kwestiach -

  • Interfejs JobContext
  • Klasa zawodu
  • Mapper Class
  • Klasa reduktora

Interfejs JobContext

Interfejs JobContext jest super interfejsem dla wszystkich klas, który definiuje różne zadania w MapReduce. Zapewnia widok tylko do odczytu zadania dostarczonego do zadań podczas ich działania.

Poniżej przedstawiono interfejsy podrzędne interfejsu JobContext.

S.No. Opis podinterfejsu
1. MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Definiuje kontekst, który jest nadawany Mapperowi.

2. ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Definiuje kontekst, który jest przekazywany do Reducer.

Klasa zadania to główna klasa implementująca interfejs JobContext.

Klasa zawodu

Klasa Job jest najważniejszą klasą interfejsu API MapReduce. Umożliwia użytkownikowi skonfigurowanie zadania, przesłanie go, kontrolę jego wykonania i sprawdzenie stanu. Metody set działają tylko do momentu przesłania zadania, po czym rzucą wyjątek IllegalStateException.

Zwykle użytkownik tworzy aplikację, opisuje różne aspekty zadania, a następnie przesyła zadanie i monitoruje jego postęp.

Oto przykład, jak przesłać ofertę pracy -

// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

Konstruktorzy

Poniżej znajduje się podsumowanie konstruktora klasy Job.

S.Nr Podsumowanie konstruktora
1 Job()
2 Job(Konfiguracja)
3 Job(Konfiguracja konfiguracji, String nazwa_zadania)

Metody

Oto niektóre z ważnych metod klasy Job:

S.Nr Opis metody
1 getJobName()

Nazwa zadania określona przez użytkownika.

2 getJobState()

Zwraca bieżący stan zadania.

3 isComplete()

Sprawdza, czy zadanie jest zakończone, czy nie.

4 setInputFormatClass()

Ustawia InputFormat dla zadania.

5 setJobName(String name)

Ustawia nazwę zadania określoną przez użytkownika.

6 setOutputFormatClass()

Ustawia format wyjściowy zadania.

7 setMapperClass(Class)

Ustawia Mapper dla zadania.

8 setReducerClass(Class)

Ustawia reduktor dla zadania.

9 setPartitionerClass(Class)

Ustawia partycjoner dla zadania.

10 setCombinerClass(Class)

Ustawia łącznik dla zadania.

Mapper Class

Klasa Mapper definiuje zadanie Map. Mapuje wejściowe pary klucz-wartość na zestaw pośrednich par klucz-wartość. Mapy to indywidualne zadania, które przekształcają rekordy wejściowe w rekordy pośrednie. Przekształcone rekordy pośrednie nie muszą być tego samego typu co rekordy wejściowe. Dana para wejściowa może być mapowana do zera lub wielu par wyjściowych.

metoda

mapjest najważniejszą metodą klasy Mapper. Składnia jest zdefiniowana poniżej -

map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

Ta metoda jest wywoływana raz dla każdej pary klucz-wartość w podziale danych wejściowych.

Klasa reduktora

Klasa Reducer definiuje zadanie Reduce w MapReduce. Redukuje zbiór wartości pośrednich, które mają wspólny klucz, do mniejszego zestawu wartości. Implementacje reduktora mogą uzyskać dostęp do konfiguracji zadania za pośrednictwem metody JobContext.getConfiguration (). Reduktor ma trzy podstawowe fazy - Tasuj, Sortuj i Zmniejsz.

  • Shuffle - Reduktor kopiuje posortowane dane wyjściowe z każdego programu mapującego przy użyciu protokołu HTTP w całej sieci.

  • Sort- Framework scala-sortuje dane wejściowe Reduktora według kluczy (ponieważ różni Mapperzy mogą wyprowadzać ten sam klucz). Fazy ​​tasowania i sortowania występują jednocześnie, tj. Podczas pobierania danych wyjściowych są one łączone.

  • Reduce - W tej fazie dla każdego <klucz, (zbiór wartości)> w posortowanych danych wejściowych wywoływana jest metoda redukuj (obiekt, iterowalny, kontekst).

metoda

reducejest najważniejszą metodą klasy Reducer. Składnia jest zdefiniowana poniżej -

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

Ta metoda jest wywoływana raz dla każdego klucza w kolekcji par klucz-wartość.

MapReduce to platforma służąca do pisania aplikacji w celu niezawodnego przetwarzania ogromnych ilości danych na dużych klastrach standardowego sprzętu. W tym rozdziale omówiono działanie MapReduce w środowisku Hadoop przy użyciu języka Java.

Algorytm MapReduce

Zasadniczo paradygmat MapReduce polega na wysyłaniu programów zmniejszających mapy do komputerów, na których znajdują się rzeczywiste dane.

  • Podczas zadania MapReduce Hadoop wysyła zadania Map i Reduce do odpowiednich serwerów w klastrze.

  • Struktura zarządza wszystkimi szczegółami przekazywania danych, takimi jak wydawanie zadań, weryfikacja ukończenia zadań i kopiowanie danych w obrębie klastra między węzłami.

  • Większość obliczeń odbywa się w węzłach z danymi na dyskach lokalnych, co zmniejsza ruch w sieci.

  • Po wykonaniu danego zadania klaster zbiera i redukuje dane do odpowiedniego wyniku i odsyła je z powrotem do serwera Hadoop.

Dane wejściowe i wyjściowe (perspektywa Java)

Struktura MapReduce działa na parach klucz-wartość, to znaczy, że struktura wyświetla dane wejściowe zadania jako zestaw par klucz-wartość i tworzy zestaw par klucz-wartość jako dane wyjściowe zadania, prawdopodobnie różnych typów.

Klasy klucza i wartości muszą być możliwe do serializacji przez platformę i dlatego wymagane jest zaimplementowanie interfejsu Writable. Ponadto kluczowe klasy muszą implementować interfejs WritableComparable, aby ułatwić sortowanie według struktury.

Zarówno format wejściowy, jak i wyjściowy zadania MapReduce mają postać par klucz-wartość -

(Wejście) <k1, v1> -> map -> <k2, v2> -> zredukuj -> <k3, v3> (wyjście).

Wejście Wynik
Mapa <k1, v1> lista (<k2, v2>)
Zmniejszyć <k2, list (v2)> lista (<k3, v3>)

Implementacja MapReduce

Poniższa tabela przedstawia dane dotyczące zużycia energii elektrycznej przez organizację. Tabela zawiera miesięczne zużycie energii elektrycznej oraz średnią roczną z pięciu kolejnych lat.

Jan Luty Zniszczyć Kwi Może Jun Lip Sie Wrz Paź Lis Grudzień Śr
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Musimy napisać aplikacje, które przetwarzają dane wejściowe w podanej tabeli, aby znaleźć rok maksymalnego użycia, rok minimalnego użytkowania i tak dalej. Zadanie to jest łatwe dla programistów ze skończoną liczbą rekordów, ponieważ po prostu napiszą logikę, aby wygenerować wymagane dane wyjściowe, i przekażą dane do napisanej aplikacji.

Podnieśmy teraz skalę danych wejściowych. Załóżmy, że musimy przeanalizować zużycie energii elektrycznej we wszystkich gałęziach przemysłu na dużą skalę w danym stanie. Kiedy piszemy aplikacje do przetwarzania takich danych zbiorczych,

  • Wykonanie ich zajmie dużo czasu.

  • Podczas przenoszenia danych ze źródła na serwer sieciowy będzie duży ruch w sieci.

Aby rozwiązać te problemy, mamy strukturę MapReduce.

Dane wejściowe

Powyższe dane są zapisywane jako sample.txti podane jako dane wejściowe. Plik wejściowy wygląda jak pokazano poniżej.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Przykładowy program

Poniższy program dla przykładowych danych używa struktury MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Zapisz powyższy program w ProcessUnits.java. Kompilację i wykonanie programu podano poniżej.

Kompilacja i wykonanie programu ProcessUnits

Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (np. / Home / hadoop).

Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.

Step 1 - Użyj poniższego polecenia, aby utworzyć katalog do przechowywania skompilowanych klas Java.

$ mkdir units

Step 2- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Pobierz jar ze strony mvnrepository.com . Załóżmy, że folder pobierania to / home / hadoop /.

Step 3 - Poniższe polecenia służą do kompilowania pliku ProcessUnits.java program i stworzyć słoik dla programu.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - Następujące polecenie służy do tworzenia katalogu wejściowego w formacie HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Następujące polecenie służy do kopiowania pliku wejściowego o nazwie sample.txt w katalogu wejściowym HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - Następujące polecenie służy do weryfikacji plików w katalogu wejściowym

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Następujące polecenie służy do uruchamiania aplikacji Eleunit_max poprzez pobieranie plików wejściowych z katalogu wejściowego.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania, zadań reduktora itp.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - Następujące polecenie służy do weryfikacji plików wynikowych w folderze wyjściowym.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Następujące polecenie służy do wyświetlania danych wyjściowych w formacie Part-00000plik. Ten plik jest generowany przez HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Poniżej przedstawiono dane wyjściowe wygenerowane przez program MapReduce -

1981 34
1984 40
1985 45

Step 10 - Następujące polecenie służy do kopiowania folderu wyjściowego z HDFS do lokalnego systemu plików.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop

Partycjoner działa jak warunek podczas przetwarzania wejściowego zestawu danych. Faza podziału ma miejsce po fazie mapy i przed fazą redukcji.

Liczba partycji jest równa liczbie reduktorów. Oznacza to, że partycjoner podzieli dane zgodnie z liczbą reduktorów. Dlatego dane przekazywane z jednego partycjonera są przetwarzane przez jeden reduktor.

Partitioner

Partycjoner dzieli pary klucz-wartość pośrednich wyjść mapy. Dzieli dane na partycje za pomocą warunku zdefiniowanego przez użytkownika, który działa jak funkcja skrótu. Całkowita liczba partycji jest taka sama, jak liczba zadań reduktora dla zadania. Weźmy przykład, aby zrozumieć, jak działa partycjoner.

Implementacja MapReduce Partitioner

Dla wygody załóżmy, że mamy małą tabelkę o nazwie Pracownik z następującymi danymi. Użyjemy tych przykładowych danych jako naszego zestawu danych wejściowych, aby zademonstrować, jak działa partycjoner.

ID Nazwa Wiek Płeć Wynagrodzenie
1201 gopal 45 Męski 50 000
1202 manisha 40 Płeć żeńska 50 000
1203 khalil 34 Męski 30 000
1204 prasanth 30 Męski 30 000
1205 kiran 20 Męski 40 000
1206 laxmi 25 Płeć żeńska 35 000
1207 bhavya 20 Płeć żeńska 15 000
1208 reshma 19 Płeć żeńska 15 000
1209 kranthi 22 Męski 22 000
1210 Satish 24 Męski 25 000
1211 Kryszna 25 Męski 25 000
1212 Arshad 28 Męski 20 000
1213 Lavanya 18 Płeć żeńska 8,000

Musimy napisać aplikację do przetwarzania zbioru danych wejściowych, aby znaleźć pracownika o najwyższym wynagrodzeniu według płci w różnych grupach wiekowych (na przykład poniżej 20 lat, od 21 do 30 lat, powyżej 30 lat).

Dane wejściowe

Powyższe dane są zapisywane jako input.txt w katalogu „/ home / hadoop / hadoopPartitioner” i podany jako dane wejściowe.

1201 gopal 45 Męski 50000
1202 manisha 40 Płeć żeńska 51000
1203 khaleel 34 Męski 30000
1204 prasanth 30 Męski 31000
1205 kiran 20 Męski 40000
1206 laxmi 25 Płeć żeńska 35000
1207 bhavya 20 Płeć żeńska 15000
1208 reshma 19 Płeć żeńska 14000
1209 kranthi 22 Męski 22000
1210 Satish 24 Męski 25000
1211 Kryszna 25 Męski 26000
1212 Arshad 28 Męski 20000
1213 Lavanya 18 Płeć żeńska 8000

Na podstawie podanych danych wejściowych następuje algorytmiczne wyjaśnienie programu.

Zadania mapy

Zadanie mapy akceptuje pary klucz-wartość jako dane wejściowe, podczas gdy dane tekstowe są w pliku tekstowym. Dane wejściowe dla tego zadania mapy są następujące -

Input - Klucz byłby wzorcem, takim jak „dowolny klucz specjalny + nazwa pliku + numer wiersza” (przykład: klucz = @ input1), a wartością byłyby dane w tym wierszu (przykład: wartość = 1201 \ t gopal \ t 45 \ t Mężczyzna \ t 50000).

Method - Działanie tego zadania mapy jest następujące -

  • Przeczytać value (dane rekordu), która jest wartością wejściową z listy argumentów w ciągu.

  • Używając funkcji split, oddziel płeć i zapisz w zmiennej ciągu.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Wyślij informacje o płci i dane rekordu value jako wyjściowa para klucz-wartość z zadania mapy do pliku partition task.

context.write(new Text(gender), new Text(value));
  • Powtórz wszystkie powyższe kroki dla wszystkich rekordów w pliku tekstowym.

Output - Otrzymasz dane dotyczące płci i wartości danych rekordu jako pary klucz-wartość.

Zadanie partycjonera

Zadanie partycjonowania akceptuje pary klucz-wartość z zadania mapy jako dane wejściowe. Partycja oznacza podział danych na segmenty. Zgodnie z podanymi warunkowymi kryteriami partycji, wejściowe sparowane dane klucz-wartość można podzielić na trzy części w oparciu o kryterium wieku.

Input - całe dane w zbiorze par klucz-wartość.

klucz = Wartość pola Płeć w rekordzie.

wartość = Wartość danych całego rekordu tej płci.

Method - Proces logiki partycji przebiega w następujący sposób.

  • Odczytaj wartość pola wieku z wejściowej pary klucz-wartość.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Sprawdź wartość wieku z następującymi warunkami.

    • Wiek niższy lub równy 20
    • Wiek: powyżej 20 lat i mniej niż lub równy 30.
    • Wiek powyżej 30 lat.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Całe dane par klucz-wartość są podzielone na trzy zbiory par klucz-wartość. Reducer działa indywidualnie na każdej kolekcji.

Zmniejsz liczbę zadań

Liczba zadań partycjonowania jest równa liczbie zadań reduktora. Tutaj mamy trzy zadania partycjonera, a zatem mamy do wykonania trzy zadania Reduktora.

Input - Reducer wykona trzykrotne wykonanie z różną kolekcją par klucz-wartość.

klucz = wartość pola płci w rekordzie.

wartość = wszystkie dane rekordu tej płci.

Method - Następująca logika zostanie zastosowana do każdej kolekcji.

  • Przeczytaj wartość pola Salary każdego rekordu.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Sprawdź wynagrodzenie ze zmienną max. Jeśli str [4] to maksymalna pensja, przypisz str [4] do max, w przeciwnym razie pomiń ten krok.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Powtórz kroki 1 i 2 dla każdego odbioru kluczy (Mężczyzna i Kobieta to odbiór kluczy). Po wykonaniu tych trzech kroków znajdziesz jedną maksymalną pensję z kolekcji kluczy dla mężczyzn i jedną maksymalną pensję z kolekcji kluczy dla kobiet.

context.write(new Text(key), new IntWritable(max));

Output- Na koniec otrzymasz zestaw danych par klucz-wartość w trzech kolekcjach z różnych grup wiekowych. Zawiera odpowiednio maksymalne wynagrodzenie z kolekcji Mężczyzna i maksymalne wynagrodzenie z kolekcji Kobieta w każdej grupie wiekowej.

Po wykonaniu zadań Map, Partitioner i Reduce, trzy kolekcje danych par klucz-wartość są przechowywane w trzech różnych plikach jako dane wyjściowe.

Wszystkie trzy zadania są traktowane jako zadania MapReduce. Poniższe wymagania i specyfikacje tych zadań należy określić w Konfiguracjach -

  • Nazwa pracy
  • Formaty wejściowe i wyjściowe kluczy i wartości
  • Indywidualne klasy dla zadań Map, Reduce i Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Przykładowy program

Poniższy program pokazuje, jak zaimplementować partycje dla podanych kryteriów w programie MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Zapisz powyższy kod jako PartitionerExample.javaw „/ home / hadoop / hadoopPartitioner”. Kompilację i wykonanie programu podano poniżej.

Kompilacja i wykonanie

Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (na przykład / home / hadoop).

Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.

Step 1- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Możesz pobrać jar ze strony mvnrepository.com .

Załóżmy, że pobrany folder to „/ home / hadoop / hadoopPartitioner”

Step 2 - Poniższe polecenia służą do kompilowania programu PartitionerExample.java i stworzenie słoika dla programu.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .

Step 3 - Użyj następującego polecenia, aby utworzyć katalog wejściowy w formacie HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Użyj następującego polecenia, aby skopiować plik wejściowy o nazwie input.txt w katalogu wejściowym HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Użyj następującego polecenia, aby zweryfikować pliki w katalogu wejściowym.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Użyj następującego polecenia, aby uruchomić aplikację Top wynagrodzenie, pobierając pliki wejściowe z katalogu wejściowego.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania i zadań reduktora.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Użyj następującego polecenia, aby zweryfikować pliki wynikowe w folderze wyjściowym.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Dane wyjściowe znajdziesz w trzech plikach, ponieważ używasz w swoim programie trzech partycjonerów i trzech reduktorów.

Step 8 - Użyj następującego polecenia, aby wyświetlić dane wyjściowe w formacie Part-00000plik. Ten plik jest generowany przez HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Użyj następującego polecenia, aby wyświetlić dane wyjściowe w Part-00001 plik.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Użyj następującego polecenia, aby wyświetlić dane wyjściowe w Part-00002 plik.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000

Combiner, znany również jako semi-reducer, jest klasą opcjonalną, która działa na zasadzie akceptowania danych wejściowych z klasy Map, a następnie przekazywania wyjściowych par klucz-wartość do klasy Reducer.

Główną funkcją Combinera jest podsumowanie rekordów wyjściowych mapy za pomocą tego samego klucza. Dane wyjściowe (kolekcja klucz-wartość) sumatora zostaną przesłane przez sieć do rzeczywistego zadania Reduktora jako dane wejściowe.

Combiner

Klasa Combiner jest używana między klasą Map a klasą Reduce w celu zmniejszenia ilości przesyłanych danych między Mapami i Reduce. Zwykle wynik zadania mapy jest duży, a dane przesyłane do zadania redukcji są duże.

Poniższy diagram zadania MapReduce przedstawia fazę łączenia.

Jak działa Combiner?

Oto krótkie podsumowanie działania MapReduce Combiner -

  • Sumator nie ma predefiniowanego interfejsu i musi implementować metodę reduktora () interfejsu Reducer.

  • Na każdym kluczu wyjściowym mapy działa sumator. Musi mieć te same wyjściowe typy klucz-wartość co klasa Reducer.

  • Sumator może generować informacje podsumowujące z dużego zestawu danych, ponieważ zastępuje oryginalne dane wyjściowe mapy.

Chociaż Combiner jest opcjonalny, ale pomaga segregować dane na wiele grup w fazie Reduce, co ułatwia przetwarzanie.

Implementacja MapReduce Combiner

Poniższy przykład przedstawia teoretyczne pojęcie o kombinatorach. Załóżmy, że mamy następujący plik wejściowy o nazwieinput.txt dla MapReduce.

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Poniżej omówiono ważne etapy działania programu MapReduce z programem Combiner.

Record Reader

Jest to pierwsza faza MapReduce, w której czytnik rekordów odczytuje każdy wiersz z wejściowego pliku tekstowego jako tekst i generuje dane wyjściowe jako pary klucz-wartość.

Input - Tekst wiersz po wierszu z pliku wejściowego.

Output- tworzy pary klucz-wartość. Poniżej przedstawiono zestaw oczekiwanych par klucz-wartość.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Faza mapy

Faza mapy pobiera dane wejściowe z czytnika rekordów, przetwarza je i generuje dane wyjściowe jako kolejny zestaw par klucz-wartość.

Input - Następująca para klucz-wartość jest wejściem pobranym z czytnika rekordów.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Faza Map odczytuje każdą parę klucz-wartość, oddziela każde słowo od wartości za pomocą StringTokenizer, traktuje każde słowo jako klucz, a liczbę tego słowa jako wartość. Poniższy fragment kodu przedstawia klasę Mapper i funkcję map.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

Output - Oczekiwany wynik jest następujący -

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Faza łączenia

Faza Combiner pobiera każdą parę klucz-wartość z fazy mapy, przetwarza ją i generuje dane wyjściowe jako key-value collection pary.

Input - Następująca para klucz-wartość to dane wejściowe pobrane z fazy mapy.

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Faza Combiner odczytuje każdą parę klucz-wartość, łączy popularne słowa jako klucz i wartości jako kolekcję. Zwykle kod i operacja łącznika są podobne do kodu reduktora. Poniżej znajduje się fragment kodu dla deklaracji klas Mapper, Combiner i Reducer.

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

Output - Oczekiwany wynik jest następujący -

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Faza reduktora

Faza Reducer pobiera każdą parę kolekcji klucz-wartość z fazy Combiner, przetwarza ją i przekazuje dane wyjściowe jako pary klucz-wartość. Zauważ, że funkcjonalność Combiner jest taka sama jak Reducer.

Input - Następująca para klucz-wartość jest wejściem pobranym z fazy Combiner.

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Faza reduktora odczytuje każdą parę klucz-wartość. Poniżej znajduje się fragment kodu Combiner.

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

Output - Oczekiwana moc wyjściowa z fazy reduktora jest następująca -

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Autor nagrań

Jest to ostatnia faza MapReduce, w której zapisujący rekord zapisuje każdą parę klucz-wartość z fazy Reducer i wysyła dane wyjściowe jako tekst.

Input - Każda para klucz-wartość z fazy reduktora wraz z formatem wyjściowym.

Output- Daje pary klucz-wartość w formacie tekstowym. Poniżej przedstawiono oczekiwany wynik.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

Przykładowy program

Poniższy blok kodu zlicza liczbę słów w programie.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Zapisz powyższy program jako WordCount.java. Kompilację i wykonanie programu podano poniżej.

Kompilacja i wykonanie

Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (na przykład / home / hadoop).

Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.

Step 1 - Użyj poniższego polecenia, aby utworzyć katalog do przechowywania skompilowanych klas Java.

$ mkdir units

Step 2- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Możesz pobrać jar ze strony mvnrepository.com .

Załóżmy, że pobrany folder to / home / hadoop /.

Step 3 - Użyj następujących poleceń, aby skompilować plik WordCount.java program i stworzyć słoik dla programu.

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

Step 4 - Użyj następującego polecenia, aby utworzyć katalog wejściowy w formacie HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Użyj następującego polecenia, aby skopiować plik wejściowy o nazwie input.txt w katalogu wejściowym HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

Step 6 - Użyj następującego polecenia, aby zweryfikować pliki w katalogu wejściowym.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Użyj następującego polecenia, aby uruchomić aplikację licznika słów, pobierając pliki wejściowe z katalogu wejściowego.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania i zadań reduktora.

Step 8 - Użyj następującego polecenia, aby zweryfikować pliki wynikowe w folderze wyjściowym.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Użyj następującego polecenia, aby wyświetlić dane wyjściowe w formacie Part-00000plik. Ten plik jest generowany przez HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Poniżej przedstawiono dane wyjściowe wygenerowane przez program MapReduce.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

W tym rozdziale opisano administrację Hadoop, która obejmuje administrację HDFS i MapReduce.

  • Administracja HDFS obejmuje monitorowanie struktury plików HDFS, lokalizacji i zaktualizowanych plików.

  • Administracja MapReduce obejmuje monitorowanie listy aplikacji, konfigurację węzłów, stan aplikacji itp.

Monitorowanie HDFS

HDFS (Hadoop Distributed File System) zawiera katalogi użytkowników, pliki wejściowe i pliki wyjściowe. Użyj poleceń MapReduce,put i get, do przechowywania i odzyskiwania.

Po uruchomieniu struktury Hadoop (demonów) przez przekazanie polecenia „start-all.sh” na „/ $ HADOOP_HOME / sbin”, przekaż następujący adres URL do przeglądarki „http: // localhost: 50070”. W przeglądarce powinien pojawić się następujący ekran.

Poniższy zrzut ekranu pokazuje, jak przeglądać pliki HDFS.

Poniższy zrzut ekranu przedstawia strukturę plików HDFS. Pokazuje pliki w katalogu „/ user / hadoop”.

Poniższy zrzut ekranu przedstawia informacje o Datanode w klastrze. Tutaj możesz znaleźć jeden węzeł z jego konfiguracjami i możliwościami.

Monitorowanie zadań MapReduce

Aplikacja MapReduce to zbiór zadań (zadanie mapowania, łączenie, partycjonowanie i zmniejszanie). Obowiązkowe jest monitorowanie i utrzymywanie następujących:

  • Konfiguracja datanode tam, gdzie aplikacja jest odpowiednia.
  • Liczba węzłów danych i zasobów używanych na aplikację.

Aby monitorować wszystkie te rzeczy, musimy mieć interfejs użytkownika. Po uruchomieniu środowiska Hadoop przez przekazanie polecenia „start-all.sh” na „/ $ HADOOP_HOME / sbin”, przekaż następujący adres URL do przeglądarki „http: // localhost: 8080”. W przeglądarce powinien pojawić się następujący ekran.

Na powyższym zrzucie ekranu wskaźnik dłoni znajduje się na identyfikatorze aplikacji. Po prostu kliknij, aby znaleźć następujący ekran w przeglądarce. Opisuje następujące -

  • Na jakim użytkowniku działa bieżąca aplikacja

  • Nazwa aplikacji

  • Rodzaj tej aplikacji

  • Stan obecny, stan ostateczny

  • Czas uruchomienia aplikacji, czas, który upłynął (czas zakończenia), jeśli jest kompletny w momencie monitorowania

  • Historia tej aplikacji, czyli informacje dziennika

  • I wreszcie informacje o węzłach, czyli węzłach, które uczestniczyły w uruchomieniu aplikacji.

Poniższy zrzut ekranu przedstawia szczegóły konkretnej aplikacji -

Poniższy zrzut ekranu przedstawia informacje o aktualnie uruchomionych węzłach. Tutaj zrzut ekranu zawiera tylko jeden węzeł. Wskaźnik dłoni pokazuje adres hosta lokalnego działającego węzła.