MapReduce - szybki przewodnik
MapReduce to model programowania do pisania aplikacji, które mogą przetwarzać duże zbiory danych równolegle na wielu węzłach. MapReduce zapewnia możliwości analityczne do analizowania ogromnych ilości złożonych danych.
Co to jest Big Data?
Big Data to zbiór dużych zbiorów danych, których nie można przetwarzać przy użyciu tradycyjnych technik obliczeniowych. Na przykład ilość danych potrzebnych Facebookowi lub YouTube do codziennego zbierania i zarządzania może należeć do kategorii Big Data. Jednak Big Data to nie tylko skala i wielkość, ale obejmuje również jeden lub więcej z następujących aspektów - prędkość, różnorodność, objętość i złożoność.
Dlaczego MapReduce?
Tradycyjne systemy korporacyjne mają zwykle scentralizowany serwer do przechowywania i przetwarzania danych. Poniższa ilustracja przedstawia schematyczny widok tradycyjnego systemu przedsiębiorstwa. Tradycyjny model z pewnością nie nadaje się do przetwarzania ogromnych ilości skalowalnych danych i nie może być dostosowany do standardowych serwerów baz danych. Co więcej, scentralizowany system tworzy zbyt duże wąskie gardło podczas jednoczesnego przetwarzania wielu plików.
Google rozwiązało ten problem z wąskim gardłem za pomocą algorytmu o nazwie MapReduce. MapReduce dzieli zadanie na małe części i przypisuje je do wielu komputerów. Później wyniki są gromadzone w jednym miejscu i integrowane w celu utworzenia zestawu danych wyników.
Jak działa MapReduce?
Algorytm MapReduce zawiera dwa ważne zadania, a mianowicie Mapowanie i Zmniejszanie.
Zadanie mapy pobiera zestaw danych i konwertuje go na inny zestaw danych, w którym poszczególne elementy są dzielone na krotki (pary klucz-wartość).
Zadanie Reduce pobiera dane wyjściowe z mapy jako dane wejściowe i łączy te krotki danych (pary klucz-wartość) w mniejszy zestaw krotek.
Zadanie redukcji jest zawsze wykonywane po utworzeniu mapy.
Przyjrzyjmy się teraz bliżej każdej z faz i spróbujmy zrozumieć ich znaczenie.
Input Phase - Tutaj mamy czytnik rekordów, który tłumaczy każdy rekord w pliku wejściowym i wysyła przeanalizowane dane do programu odwzorowującego w postaci par klucz-wartość.
Map - Map to funkcja zdefiniowana przez użytkownika, która pobiera serię par klucz-wartość i przetwarza każdą z nich, aby wygenerować zero lub więcej par klucz-wartość.
Intermediate Keys - Pary klucz-wartość generowane przez program odwzorowujący nazywane są kluczami pośrednimi.
Combiner- Sumator jest rodzajem lokalnego reduktora, który grupuje podobne dane z fazy mapy w możliwe do zidentyfikowania zbiory. Pobiera klucze pośrednie z programu odwzorowującego jako dane wejściowe i stosuje kod zdefiniowany przez użytkownika w celu agregacji wartości w małym zakresie jednego programu odwzorowującego. Nie jest częścią głównego algorytmu MapReduce; jest to opcjonalne.
Shuffle and Sort- Zadanie Reducer rozpoczyna się od kroku Shuffle and Sort. Pobiera zgrupowane pary klucz-wartość na komputer lokalny, na którym działa reduktor. Poszczególne pary klucz-wartość są sortowane według klucza w większą listę danych. Lista danych grupuje równoważne klucze razem, dzięki czemu ich wartości mogą być łatwo iterowane w zadaniu reduktora.
Reducer- Reducer pobiera pogrupowane sparowane dane klucz-wartość jako dane wejściowe i uruchamia funkcję Reduktora na każdym z nich. Tutaj dane mogą być agregowane, filtrowane i łączone na wiele sposobów, a to wymaga szerokiego zakresu przetwarzania. Po zakończeniu wykonywania daje zero lub więcej par klucz-wartość do ostatniego kroku.
Output Phase - W fazie wyjściowej mamy program formatujący dane wyjściowe, który tłumaczy końcowe pary klucz-wartość z funkcji Reducer i zapisuje je do pliku za pomocą programu do zapisywania rekordów.
Spróbujmy zrozumieć dwa zadania Map & f Reduce za pomocą małego diagramu -
MapReduce-Example
Weźmy przykład ze świata rzeczywistego, aby zrozumieć moc MapReduce. Twitter otrzymuje około 500 milionów tweetów dziennie, co daje prawie 3000 tweetów na sekundę. Poniższa ilustracja pokazuje, jak głośnik wysokotonowy zarządza swoimi tweetami za pomocą MapReduce.
Jak pokazano na ilustracji, algorytm MapReduce wykonuje następujące czynności -
Tokenize - Tokenizuje tweety na mapy tokenów i zapisuje je jako pary klucz-wartość.
Filter - Filtruje niechciane słowa z map tokenów i zapisuje przefiltrowane mapy jako pary klucz-wartość.
Count - Generuje licznik żetonów na słowo.
Aggregate Counters - Przygotowuje agregat podobnych wartości liczników w małe możliwe do zarządzania jednostki.
Algorytm MapReduce zawiera dwa ważne zadania, a mianowicie Mapowanie i Zmniejszanie.
- Zadanie mapy jest realizowane za pomocą klasy Mapper
- Zadanie redukcji jest realizowane za pomocą klasy reduktora.
Klasa Mapper pobiera dane wejściowe, tokenizuje je, mapuje i sortuje. Dane wyjściowe klasy Mapper są używane jako dane wejściowe przez klasę Reducer, która z kolei wyszukuje pasujące pary i redukuje je.
MapReduce implementuje różne algorytmy matematyczne, aby podzielić zadanie na małe części i przypisać je do wielu systemów. Pod względem technicznym algorytm MapReduce pomaga w wysyłaniu zadań Map & Reduce do odpowiednich serwerów w klastrze.
Te algorytmy matematyczne mogą obejmować:
- Sorting
- Searching
- Indexing
- TF-IDF
Sortowanie
Sortowanie jest jednym z podstawowych algorytmów MapReduce do przetwarzania i analizy danych. MapReduce implementuje algorytm sortowania, aby automatycznie sortować wyjściowe pary klucz-wartość z programu odwzorowującego według ich kluczy.
Metody sortowania są zaimplementowane w samej klasie mappera.
W fazie Shuffle and Sort, po tokenizowaniu wartości w klasie mappera, plik Context class (klasa zdefiniowana przez użytkownika) zbiera pasujące klucze o wartościach jako kolekcję.
Aby zebrać podobne pary klucz-wartość (klucze pośrednie), klasa Mapper korzysta z pomocy RawComparator klasy, aby posortować pary klucz-wartość.
Zestaw pośrednich par klucz-wartość dla danego Reducera jest automatycznie sortowany przez Hadoop w celu utworzenia par klucz-wartość (K2, {V2, V2,…}), zanim zostaną przedstawione Reducerowi.
Badawczy
Wyszukiwanie odgrywa ważną rolę w algorytmie MapReduce. Pomaga w fazie sumatora (opcjonalnie) oraz w fazie reduktora. Spróbujmy zrozumieć, jak działa wyszukiwanie, na przykładzie.
Przykład
Poniższy przykład pokazuje, jak MapReduce wykorzystuje algorytm wyszukiwania, aby poznać szczegóły pracownika, który pobiera najwyższe wynagrodzenie w danym zbiorze danych pracowników.
Załóżmy, że mamy dane pracowników w czterech różnych plikach - A, B, C i D. Załóżmy również, że we wszystkich czterech plikach znajdują się zduplikowane rekordy pracowników z powodu wielokrotnego importowania danych pracowników ze wszystkich tabel bazy danych. Zobacz poniższą ilustrację.
The Map phaseprzetwarza każdy plik wejściowy i dostarcza dane pracowników w parach klucz-wartość (<k, v>: <imię i nazwisko, wynagrodzenie>). Zobacz poniższą ilustrację.
The combiner phase(technika wyszukiwania) zaakceptuje dane wejściowe z fazy mapy jako parę klucz-wartość z nazwiskiem pracownika i jego wynagrodzeniem. Korzystając z techniki wyszukiwania, sumator sprawdzi wszystkie pensje pracowników, aby znaleźć pracownika o najwyższej pensji w każdym pliku. Zobacz poniższy fragment.
<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary
if(v(second employee).salary > Max){
Max = v(salary);
}
else{
Continue checking;
}
Oczekiwany wynik jest następujący -
|
Reducer phase- W każdym pliku znajdziesz najwyżej opłacanego pracownika. Aby uniknąć nadmiarowości, sprawdź wszystkie pary <k, v> i wyeliminuj zduplikowane wpisy, jeśli występują. Ten sam algorytm jest używany między czterema parami <k, v>, które pochodzą z czterech plików wejściowych. Ostateczny wynik powinien wyglądać następująco -
<gopal, 50000>
Indeksowanie
Zwykle indeksowanie służy do wskazywania konkretnych danych i ich adresu. Wykonuje indeksowanie wsadowe plików wejściowych dla określonego programu Mapper.
Technika indeksowania, która jest zwykle używana w MapReduce, jest znana jako inverted index.Wyszukiwarki takie jak Google i Bing używają techniki indeksowania odwróconego. Spróbujmy zrozumieć, jak działa indeksowanie, na prostym przykładzie.
Przykład
Poniższy tekst stanowi dane wejściowe dla odwróconego indeksowania. Tutaj T [0], T [1] it [2] to nazwy plików, a ich zawartość jest w cudzysłowie.
T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"
Po zastosowaniu algorytmu indeksowania otrzymujemy następujący wynik -
"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}
Tutaj „a”: {2} oznacza, że termin „a” pojawia się w pliku T [2]. Podobnie „jest”: {0, 1, 2} oznacza, że termin „jest” pojawia się w plikach T [0], T [1] i T [2].
TF-IDF
TF-IDF to algorytm przetwarzania tekstu, który jest skrótem od Term Frequency - Inverse Document Frequency. Jest to jeden z powszechnych algorytmów analizy sieci. Tutaj termin „częstotliwość” odnosi się do tego, ile razy termin pojawia się w dokumencie.
Termin Częstotliwość (TF)
Mierzy, jak często dany termin występuje w dokumencie. Oblicza się, ile razy słowo pojawia się w dokumencie, podzielone przez całkowitą liczbę słów w tym dokumencie.
TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)
Odwrotna częstotliwość dokumentu (IDF)
Mierzy znaczenie terminu. Jest obliczany przez liczbę dokumentów w tekstowej bazie danych podzieloną przez liczbę dokumentów, w których występuje określony termin.
Podczas obliczania TF wszystkie terminy są uważane za równie ważne. Oznacza to, że TF liczy częstotliwość terminów dla normalnych słów, takich jak „jest”, „a”, „co” itp. Dlatego musimy znać częste terminy podczas skalowania w górę rzadkich, obliczając następujące -
IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).
Algorytm jest wyjaśniony poniżej za pomocą małego przykładu.
Przykład
Rozważmy dokument zawierający 1000 słów, w tym słowo hivepojawia się 50 razy. TF dlahive jest wtedy (50/1000) = 0,05.
Teraz załóżmy, że mamy 10 milionów dokumentów i słowo hivepojawia się w 1000 z nich. Następnie IDF jest obliczany jako log (10 000 000/1 000) = 4.
Iloczynem tych wielkości jest waga TF-IDF - 0,05 × 4 = 0,20.
MapReduce działa tylko w systemach operacyjnych o smaku Linux i ma wbudowaną strukturę Hadoop. Aby zainstalować framework Hadoop, musimy wykonać następujące kroki.
Weryfikacja instalacji JAVA
Javę należy zainstalować w systemie przed zainstalowaniem Hadoop. Użyj następującego polecenia, aby sprawdzić, czy w systemie jest zainstalowana Java.
$ java –version
Jeśli Java jest już zainstalowana w twoim systemie, zobaczysz następującą odpowiedź -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
Jeśli nie masz zainstalowanej Javy w swoim systemie, wykonaj kroki podane poniżej.
Instalowanie Java
Krok 1
Pobierz najnowszą wersję Java z poniższego linku - ten link .
Po pobraniu możesz zlokalizować plik jdk-7u71-linux-x64.tar.gz w folderze Pobrane.
Krok 2
Użyj następujących poleceń, aby wyodrębnić zawartość jdk-7u71-linux-x64.gz.
$ cd Downloads/
$ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz
Krok 3
Aby udostępnić Javę wszystkim użytkownikom, musisz przenieść ją do lokalizacji „/ usr / local /”. Przejdź do roota i wpisz następujące polecenia -
$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit
Krok 4
Aby ustawić zmienne PATH i JAVA_HOME, dodaj następujące polecenia do pliku ~ / .bashrc.
export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin
Zastosuj wszystkie zmiany do aktualnie działającego systemu.
$ source ~/.bashrc
Krok 5
Użyj następujących poleceń, aby skonfigurować alternatywy Java -
# alternatives --install /usr/bin/java java usr/local/java/bin/java 2
# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2
# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2
# alternatives --set java usr/local/java/bin/java
# alternatives --set javac usr/local/java/bin/javac
# alternatives --set jar usr/local/java/bin/jar
Teraz sprawdź instalację za pomocą polecenia java -version z terminala.
Weryfikacja instalacji Hadoop
Hadoop musi być zainstalowany w systemie przed zainstalowaniem MapReduce. Zweryfikujmy instalację Hadoop za pomocą następującego polecenia -
$ hadoop version
Jeśli Hadoop jest już zainstalowany w twoim systemie, otrzymasz następującą odpowiedź -
Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4
Jeśli usługa Hadoop nie jest zainstalowana w systemie, wykonaj następujące czynności.
Pobieranie Hadoop
Pobierz Hadoop 2.4.1 z Apache Software Foundation i wyodrębnij jego zawartość za pomocą następujących poleceń.
$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit
Instalowanie Hadoop w trybie pseudo-rozproszonym
Poniższe kroki służą do instalowania Hadoop 2.4.1 w trybie pseudo rozproszonym.
Krok 1 - Konfiguracja Hadoop
Możesz ustawić zmienne środowiskowe Hadoop, dołączając następujące polecenia do pliku ~ / .bashrc.
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
Zastosuj wszystkie zmiany do aktualnie działającego systemu.
$ source ~/.bashrc
Krok 2 - Konfiguracja Hadoop
Wszystkie pliki konfiguracyjne Hadoop można znaleźć w lokalizacji „$ HADOOP_HOME / etc / hadoop”. Musisz wprowadzić odpowiednie zmiany w tych plikach konfiguracyjnych zgodnie z infrastrukturą Hadoop.
$ cd $HADOOP_HOME/etc/hadoop
Aby tworzyć programy Hadoop przy użyciu języka Java, musisz zresetować zmienne środowiskowe Java w hadoop-env.sh plik, zastępując wartość JAVA_HOME lokalizacją Java w systemie.
export JAVA_HOME=/usr/local/java
Aby skonfigurować Hadoop, musisz edytować następujące pliki -
- core-site.xml
- hdfs-site.xml
- yarn-site.xml
- mapred-site.xml
core-site.xml
core-site.xml zawiera następujące informacje -
- Numer portu używany dla wystąpienia Hadoop
- Pamięć przydzielona dla systemu plików
- Limit pamięci do przechowywania danych
- Rozmiar buforów do odczytu / zapisu
Otwórz plik core-site.xml i dodaj następujące właściwości między tagami <configuration> i </configuration>.
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000 </value>
</property>
</configuration>
hdfs-site.xml
hdfs-site.xml zawiera następujące informacje -
- Wartość danych replikacji
- Ścieżka do namenode
- Ścieżka danych w lokalnych systemach plików (miejsce, w którym chcesz przechowywać infra Hadoop)
Załóżmy następujące dane.
dfs.replication (data replication value) = 1
(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode
(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode
Otwórz ten plik i dodaj następujące właściwości między tagami <configuration>, </configuration>.
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
</property>
</configuration>
Note - W powyższym pliku wszystkie wartości właściwości są zdefiniowane przez użytkownika i można wprowadzać zmiany zgodnie z infrastrukturą Hadoop.
yarn-site.xml
Ten plik służy do konfigurowania przędzy w Hadoop. Otwórz plik yarn-site.xml i dodaj następujące właściwości między tagami <configuration>, </configuration>.
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
mapred-site.xml
Ten plik służy do określenia używanej przez nas struktury MapReduce. Domyślnie Hadoop zawiera szablon yarn-site.xml. Przede wszystkim musisz skopiować plik z mapred-site.xml.template do pliku mapred-site.xml za pomocą następującego polecenia.
$ cp mapred-site.xml.template mapred-site.xml
Otwórz plik mapred-site.xml i dodaj następujące właściwości między tagami <configuration>, </configuration>.
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
Weryfikacja instalacji Hadoop
Poniższe kroki służą do weryfikacji instalacji Hadoop.
Krok 1 - Konfiguracja nazwy węzła
Skonfiguruj namenode za pomocą polecenia „hdfs namenode -format” w następujący sposób -
$ cd ~ $ hdfs namenode -format
Oczekiwany wynik jest następujący -
10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/
Krok 2 - Weryfikacja plików dfs na platformie Hadoop
Wykonaj następujące polecenie, aby uruchomić system plików Hadoop.
$ start-dfs.sh
Oczekiwany wynik jest następujący -
10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]
Krok 3 - weryfikacja skryptu przędzy
Następujące polecenie służy do uruchamiania skryptu przędzy. Wykonanie tego polecenia spowoduje uruchomienie demonów przędzy.
$ start-yarn.sh
Oczekiwany wynik jest następujący -
starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out
Krok 4 - Dostęp do Hadoop w przeglądarce
Domyślny numer portu dostępu do Hadoop to 50070. Użyj następującego adresu URL, aby pobrać usługi Hadoop w przeglądarce.
http://localhost:50070/
Poniższy zrzut ekranu przedstawia przeglądarkę Hadoop.
Krok 5 - Zweryfikuj wszystkie aplikacje klastra
Domyślny numer portu dostępu do wszystkich aplikacji klastra to 8088. Aby skorzystać z tej usługi, użyj następującego adresu URL.
http://localhost:8088/
Poniższy zrzut ekranu przedstawia przeglądarkę klastra Hadoop.
W tym rozdziale przyjrzymy się bliżej klasom i ich metodom, które są zaangażowane w operacje programowania MapReduce. Będziemy przede wszystkim skupiać się na następujących kwestiach -
- Interfejs JobContext
- Klasa zawodu
- Mapper Class
- Klasa reduktora
Interfejs JobContext
Interfejs JobContext jest super interfejsem dla wszystkich klas, który definiuje różne zadania w MapReduce. Zapewnia widok tylko do odczytu zadania dostarczonego do zadań podczas ich działania.
Poniżej przedstawiono interfejsy podrzędne interfejsu JobContext.
S.No. | Opis podinterfejsu |
---|---|
1. | MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Definiuje kontekst, który jest nadawany Mapperowi. |
2. | ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Definiuje kontekst, który jest przekazywany do Reducer. |
Klasa zadania to główna klasa implementująca interfejs JobContext.
Klasa zawodu
Klasa Job jest najważniejszą klasą interfejsu API MapReduce. Umożliwia użytkownikowi skonfigurowanie zadania, przesłanie go, kontrolę jego wykonania i sprawdzenie stanu. Metody set działają tylko do momentu przesłania zadania, po czym rzucą wyjątek IllegalStateException.
Zwykle użytkownik tworzy aplikację, opisuje różne aspekty zadania, a następnie przesyła zadanie i monitoruje jego postęp.
Oto przykład, jak przesłać ofertę pracy -
// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);
// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
Konstruktorzy
Poniżej znajduje się podsumowanie konstruktora klasy Job.
S.Nr | Podsumowanie konstruktora |
---|---|
1 | Job() |
2 | Job(Konfiguracja) |
3 | Job(Konfiguracja konfiguracji, String nazwa_zadania) |
Metody
Oto niektóre z ważnych metod klasy Job:
S.Nr | Opis metody |
---|---|
1 | getJobName() Nazwa zadania określona przez użytkownika. |
2 | getJobState() Zwraca bieżący stan zadania. |
3 | isComplete() Sprawdza, czy zadanie jest zakończone, czy nie. |
4 | setInputFormatClass() Ustawia InputFormat dla zadania. |
5 | setJobName(String name) Ustawia nazwę zadania określoną przez użytkownika. |
6 | setOutputFormatClass() Ustawia format wyjściowy zadania. |
7 | setMapperClass(Class) Ustawia Mapper dla zadania. |
8 | setReducerClass(Class) Ustawia reduktor dla zadania. |
9 | setPartitionerClass(Class) Ustawia partycjoner dla zadania. |
10 | setCombinerClass(Class) Ustawia łącznik dla zadania. |
Mapper Class
Klasa Mapper definiuje zadanie Map. Mapuje wejściowe pary klucz-wartość na zestaw pośrednich par klucz-wartość. Mapy to indywidualne zadania, które przekształcają rekordy wejściowe w rekordy pośrednie. Przekształcone rekordy pośrednie nie muszą być tego samego typu co rekordy wejściowe. Dana para wejściowa może być mapowana do zera lub wielu par wyjściowych.
metoda
mapjest najważniejszą metodą klasy Mapper. Składnia jest zdefiniowana poniżej -
map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)
Ta metoda jest wywoływana raz dla każdej pary klucz-wartość w podziale danych wejściowych.
Klasa reduktora
Klasa Reducer definiuje zadanie Reduce w MapReduce. Redukuje zbiór wartości pośrednich, które mają wspólny klucz, do mniejszego zestawu wartości. Implementacje reduktora mogą uzyskać dostęp do konfiguracji zadania za pośrednictwem metody JobContext.getConfiguration (). Reduktor ma trzy podstawowe fazy - Tasuj, Sortuj i Zmniejsz.
Shuffle - Reduktor kopiuje posortowane dane wyjściowe z każdego programu mapującego przy użyciu protokołu HTTP w całej sieci.
Sort- Framework scala-sortuje dane wejściowe Reduktora według kluczy (ponieważ różni Mapperzy mogą wyprowadzać ten sam klucz). Fazy tasowania i sortowania występują jednocześnie, tj. Podczas pobierania danych wyjściowych są one łączone.
Reduce - W tej fazie dla każdego <klucz, (zbiór wartości)> w posortowanych danych wejściowych wywoływana jest metoda redukuj (obiekt, iterowalny, kontekst).
metoda
reducejest najważniejszą metodą klasy Reducer. Składnia jest zdefiniowana poniżej -
reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)
Ta metoda jest wywoływana raz dla każdego klucza w kolekcji par klucz-wartość.
MapReduce to platforma służąca do pisania aplikacji w celu niezawodnego przetwarzania ogromnych ilości danych na dużych klastrach standardowego sprzętu. W tym rozdziale omówiono działanie MapReduce w środowisku Hadoop przy użyciu języka Java.
Algorytm MapReduce
Zasadniczo paradygmat MapReduce polega na wysyłaniu programów zmniejszających mapy do komputerów, na których znajdują się rzeczywiste dane.
Podczas zadania MapReduce Hadoop wysyła zadania Map i Reduce do odpowiednich serwerów w klastrze.
Struktura zarządza wszystkimi szczegółami przekazywania danych, takimi jak wydawanie zadań, weryfikacja ukończenia zadań i kopiowanie danych w obrębie klastra między węzłami.
Większość obliczeń odbywa się w węzłach z danymi na dyskach lokalnych, co zmniejsza ruch w sieci.
Po wykonaniu danego zadania klaster zbiera i redukuje dane do odpowiedniego wyniku i odsyła je z powrotem do serwera Hadoop.
Dane wejściowe i wyjściowe (perspektywa Java)
Struktura MapReduce działa na parach klucz-wartość, to znaczy, że struktura wyświetla dane wejściowe zadania jako zestaw par klucz-wartość i tworzy zestaw par klucz-wartość jako dane wyjściowe zadania, prawdopodobnie różnych typów.
Klasy klucza i wartości muszą być możliwe do serializacji przez platformę i dlatego wymagane jest zaimplementowanie interfejsu Writable. Ponadto kluczowe klasy muszą implementować interfejs WritableComparable, aby ułatwić sortowanie według struktury.
Zarówno format wejściowy, jak i wyjściowy zadania MapReduce mają postać par klucz-wartość -
(Wejście) <k1, v1> -> map -> <k2, v2> -> zredukuj -> <k3, v3> (wyjście).
Wejście | Wynik | |
---|---|---|
Mapa | <k1, v1> | lista (<k2, v2>) |
Zmniejszyć | <k2, list (v2)> | lista (<k3, v3>) |
Implementacja MapReduce
Poniższa tabela przedstawia dane dotyczące zużycia energii elektrycznej przez organizację. Tabela zawiera miesięczne zużycie energii elektrycznej oraz średnią roczną z pięciu kolejnych lat.
Jan | Luty | Zniszczyć | Kwi | Może | Jun | Lip | Sie | Wrz | Paź | Lis | Grudzień | Śr | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Musimy napisać aplikacje, które przetwarzają dane wejściowe w podanej tabeli, aby znaleźć rok maksymalnego użycia, rok minimalnego użytkowania i tak dalej. Zadanie to jest łatwe dla programistów ze skończoną liczbą rekordów, ponieważ po prostu napiszą logikę, aby wygenerować wymagane dane wyjściowe, i przekażą dane do napisanej aplikacji.
Podnieśmy teraz skalę danych wejściowych. Załóżmy, że musimy przeanalizować zużycie energii elektrycznej we wszystkich gałęziach przemysłu na dużą skalę w danym stanie. Kiedy piszemy aplikacje do przetwarzania takich danych zbiorczych,
Wykonanie ich zajmie dużo czasu.
Podczas przenoszenia danych ze źródła na serwer sieciowy będzie duży ruch w sieci.
Aby rozwiązać te problemy, mamy strukturę MapReduce.
Dane wejściowe
Powyższe dane są zapisywane jako sample.txti podane jako dane wejściowe. Plik wejściowy wygląda jak pokazano poniżej.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Przykładowy program
Poniższy program dla przykładowych danych używa struktury MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Zapisz powyższy program w ProcessUnits.java. Kompilację i wykonanie programu podano poniżej.
Kompilacja i wykonanie programu ProcessUnits
Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (np. / Home / hadoop).
Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.
Step 1 - Użyj poniższego polecenia, aby utworzyć katalog do przechowywania skompilowanych klas Java.
$ mkdir units
Step 2- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Pobierz jar ze strony mvnrepository.com . Załóżmy, że folder pobierania to / home / hadoop /.
Step 3 - Poniższe polecenia służą do kompilowania pliku ProcessUnits.java program i stworzyć słoik dla programu.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - Następujące polecenie służy do tworzenia katalogu wejściowego w formacie HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Następujące polecenie służy do kopiowania pliku wejściowego o nazwie sample.txt w katalogu wejściowym HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - Następujące polecenie służy do weryfikacji plików w katalogu wejściowym
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Następujące polecenie służy do uruchamiania aplikacji Eleunit_max poprzez pobieranie plików wejściowych z katalogu wejściowego.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania, zadań reduktora itp.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - Następujące polecenie służy do weryfikacji plików wynikowych w folderze wyjściowym.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Następujące polecenie służy do wyświetlania danych wyjściowych w formacie Part-00000plik. Ten plik jest generowany przez HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Poniżej przedstawiono dane wyjściowe wygenerowane przez program MapReduce -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - Następujące polecenie służy do kopiowania folderu wyjściowego z HDFS do lokalnego systemu plików.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop
Partycjoner działa jak warunek podczas przetwarzania wejściowego zestawu danych. Faza podziału ma miejsce po fazie mapy i przed fazą redukcji.
Liczba partycji jest równa liczbie reduktorów. Oznacza to, że partycjoner podzieli dane zgodnie z liczbą reduktorów. Dlatego dane przekazywane z jednego partycjonera są przetwarzane przez jeden reduktor.
Partitioner
Partycjoner dzieli pary klucz-wartość pośrednich wyjść mapy. Dzieli dane na partycje za pomocą warunku zdefiniowanego przez użytkownika, który działa jak funkcja skrótu. Całkowita liczba partycji jest taka sama, jak liczba zadań reduktora dla zadania. Weźmy przykład, aby zrozumieć, jak działa partycjoner.
Implementacja MapReduce Partitioner
Dla wygody załóżmy, że mamy małą tabelkę o nazwie Pracownik z następującymi danymi. Użyjemy tych przykładowych danych jako naszego zestawu danych wejściowych, aby zademonstrować, jak działa partycjoner.
ID | Nazwa | Wiek | Płeć | Wynagrodzenie |
---|---|---|---|---|
1201 | gopal | 45 | Męski | 50 000 |
1202 | manisha | 40 | Płeć żeńska | 50 000 |
1203 | khalil | 34 | Męski | 30 000 |
1204 | prasanth | 30 | Męski | 30 000 |
1205 | kiran | 20 | Męski | 40 000 |
1206 | laxmi | 25 | Płeć żeńska | 35 000 |
1207 | bhavya | 20 | Płeć żeńska | 15 000 |
1208 | reshma | 19 | Płeć żeńska | 15 000 |
1209 | kranthi | 22 | Męski | 22 000 |
1210 | Satish | 24 | Męski | 25 000 |
1211 | Kryszna | 25 | Męski | 25 000 |
1212 | Arshad | 28 | Męski | 20 000 |
1213 | Lavanya | 18 | Płeć żeńska | 8,000 |
Musimy napisać aplikację do przetwarzania zbioru danych wejściowych, aby znaleźć pracownika o najwyższym wynagrodzeniu według płci w różnych grupach wiekowych (na przykład poniżej 20 lat, od 21 do 30 lat, powyżej 30 lat).
Dane wejściowe
Powyższe dane są zapisywane jako input.txt w katalogu „/ home / hadoop / hadoopPartitioner” i podany jako dane wejściowe.
1201 | gopal | 45 | Męski | 50000 |
1202 | manisha | 40 | Płeć żeńska | 51000 |
1203 | khaleel | 34 | Męski | 30000 |
1204 | prasanth | 30 | Męski | 31000 |
1205 | kiran | 20 | Męski | 40000 |
1206 | laxmi | 25 | Płeć żeńska | 35000 |
1207 | bhavya | 20 | Płeć żeńska | 15000 |
1208 | reshma | 19 | Płeć żeńska | 14000 |
1209 | kranthi | 22 | Męski | 22000 |
1210 | Satish | 24 | Męski | 25000 |
1211 | Kryszna | 25 | Męski | 26000 |
1212 | Arshad | 28 | Męski | 20000 |
1213 | Lavanya | 18 | Płeć żeńska | 8000 |
Na podstawie podanych danych wejściowych następuje algorytmiczne wyjaśnienie programu.
Zadania mapy
Zadanie mapy akceptuje pary klucz-wartość jako dane wejściowe, podczas gdy dane tekstowe są w pliku tekstowym. Dane wejściowe dla tego zadania mapy są następujące -
Input - Klucz byłby wzorcem, takim jak „dowolny klucz specjalny + nazwa pliku + numer wiersza” (przykład: klucz = @ input1), a wartością byłyby dane w tym wierszu (przykład: wartość = 1201 \ t gopal \ t 45 \ t Mężczyzna \ t 50000).
Method - Działanie tego zadania mapy jest następujące -
Przeczytać value (dane rekordu), która jest wartością wejściową z listy argumentów w ciągu.
Używając funkcji split, oddziel płeć i zapisz w zmiennej ciągu.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Wyślij informacje o płci i dane rekordu value jako wyjściowa para klucz-wartość z zadania mapy do pliku partition task.
context.write(new Text(gender), new Text(value));
Powtórz wszystkie powyższe kroki dla wszystkich rekordów w pliku tekstowym.
Output - Otrzymasz dane dotyczące płci i wartości danych rekordu jako pary klucz-wartość.
Zadanie partycjonera
Zadanie partycjonowania akceptuje pary klucz-wartość z zadania mapy jako dane wejściowe. Partycja oznacza podział danych na segmenty. Zgodnie z podanymi warunkowymi kryteriami partycji, wejściowe sparowane dane klucz-wartość można podzielić na trzy części w oparciu o kryterium wieku.
Input - całe dane w zbiorze par klucz-wartość.
klucz = Wartość pola Płeć w rekordzie.
wartość = Wartość danych całego rekordu tej płci.
Method - Proces logiki partycji przebiega w następujący sposób.
- Odczytaj wartość pola wieku z wejściowej pary klucz-wartość.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Sprawdź wartość wieku z następującymi warunkami.
- Wiek niższy lub równy 20
- Wiek: powyżej 20 lat i mniej niż lub równy 30.
- Wiek powyżej 30 lat.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Całe dane par klucz-wartość są podzielone na trzy zbiory par klucz-wartość. Reducer działa indywidualnie na każdej kolekcji.
Zmniejsz liczbę zadań
Liczba zadań partycjonowania jest równa liczbie zadań reduktora. Tutaj mamy trzy zadania partycjonera, a zatem mamy do wykonania trzy zadania Reduktora.
Input - Reducer wykona trzykrotne wykonanie z różną kolekcją par klucz-wartość.
klucz = wartość pola płci w rekordzie.
wartość = wszystkie dane rekordu tej płci.
Method - Następująca logika zostanie zastosowana do każdej kolekcji.
- Przeczytaj wartość pola Salary każdego rekordu.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Sprawdź wynagrodzenie ze zmienną max. Jeśli str [4] to maksymalna pensja, przypisz str [4] do max, w przeciwnym razie pomiń ten krok.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Powtórz kroki 1 i 2 dla każdego odbioru kluczy (Mężczyzna i Kobieta to odbiór kluczy). Po wykonaniu tych trzech kroków znajdziesz jedną maksymalną pensję z kolekcji kluczy dla mężczyzn i jedną maksymalną pensję z kolekcji kluczy dla kobiet.
context.write(new Text(key), new IntWritable(max));
Output- Na koniec otrzymasz zestaw danych par klucz-wartość w trzech kolekcjach z różnych grup wiekowych. Zawiera odpowiednio maksymalne wynagrodzenie z kolekcji Mężczyzna i maksymalne wynagrodzenie z kolekcji Kobieta w każdej grupie wiekowej.
Po wykonaniu zadań Map, Partitioner i Reduce, trzy kolekcje danych par klucz-wartość są przechowywane w trzech różnych plikach jako dane wyjściowe.
Wszystkie trzy zadania są traktowane jako zadania MapReduce. Poniższe wymagania i specyfikacje tych zadań należy określić w Konfiguracjach -
- Nazwa pracy
- Formaty wejściowe i wyjściowe kluczy i wartości
- Indywidualne klasy dla zadań Map, Reduce i Partitioner
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Przykładowy program
Poniższy program pokazuje, jak zaimplementować partycje dla podanych kryteriów w programie MapReduce.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Zapisz powyższy kod jako PartitionerExample.javaw „/ home / hadoop / hadoopPartitioner”. Kompilację i wykonanie programu podano poniżej.
Kompilacja i wykonanie
Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (na przykład / home / hadoop).
Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.
Step 1- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Możesz pobrać jar ze strony mvnrepository.com .
Załóżmy, że pobrany folder to „/ home / hadoop / hadoopPartitioner”
Step 2 - Poniższe polecenia służą do kompilowania programu PartitionerExample.java i stworzenie słoika dla programu.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .
Step 3 - Użyj następującego polecenia, aby utworzyć katalog wejściowy w formacie HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Użyj następującego polecenia, aby skopiować plik wejściowy o nazwie input.txt w katalogu wejściowym HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Użyj następującego polecenia, aby zweryfikować pliki w katalogu wejściowym.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Użyj następującego polecenia, aby uruchomić aplikację Top wynagrodzenie, pobierając pliki wejściowe z katalogu wejściowego.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania i zadań reduktora.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Użyj następującego polecenia, aby zweryfikować pliki wynikowe w folderze wyjściowym.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Dane wyjściowe znajdziesz w trzech plikach, ponieważ używasz w swoim programie trzech partycjonerów i trzech reduktorów.
Step 8 - Użyj następującego polecenia, aby wyświetlić dane wyjściowe w formacie Part-00000plik. Ten plik jest generowany przez HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Użyj następującego polecenia, aby wyświetlić dane wyjściowe w Part-00001 plik.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Użyj następującego polecenia, aby wyświetlić dane wyjściowe w Part-00002 plik.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000
Combiner, znany również jako semi-reducer, jest klasą opcjonalną, która działa na zasadzie akceptowania danych wejściowych z klasy Map, a następnie przekazywania wyjściowych par klucz-wartość do klasy Reducer.
Główną funkcją Combinera jest podsumowanie rekordów wyjściowych mapy za pomocą tego samego klucza. Dane wyjściowe (kolekcja klucz-wartość) sumatora zostaną przesłane przez sieć do rzeczywistego zadania Reduktora jako dane wejściowe.
Combiner
Klasa Combiner jest używana między klasą Map a klasą Reduce w celu zmniejszenia ilości przesyłanych danych między Mapami i Reduce. Zwykle wynik zadania mapy jest duży, a dane przesyłane do zadania redukcji są duże.
Poniższy diagram zadania MapReduce przedstawia fazę łączenia.
Jak działa Combiner?
Oto krótkie podsumowanie działania MapReduce Combiner -
Sumator nie ma predefiniowanego interfejsu i musi implementować metodę reduktora () interfejsu Reducer.
Na każdym kluczu wyjściowym mapy działa sumator. Musi mieć te same wyjściowe typy klucz-wartość co klasa Reducer.
Sumator może generować informacje podsumowujące z dużego zestawu danych, ponieważ zastępuje oryginalne dane wyjściowe mapy.
Chociaż Combiner jest opcjonalny, ale pomaga segregować dane na wiele grup w fazie Reduce, co ułatwia przetwarzanie.
Implementacja MapReduce Combiner
Poniższy przykład przedstawia teoretyczne pojęcie o kombinatorach. Załóżmy, że mamy następujący plik wejściowy o nazwieinput.txt dla MapReduce.
What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance
Poniżej omówiono ważne etapy działania programu MapReduce z programem Combiner.
Record Reader
Jest to pierwsza faza MapReduce, w której czytnik rekordów odczytuje każdy wiersz z wejściowego pliku tekstowego jako tekst i generuje dane wyjściowe jako pary klucz-wartość.
Input - Tekst wiersz po wierszu z pliku wejściowego.
Output- tworzy pary klucz-wartość. Poniżej przedstawiono zestaw oczekiwanych par klucz-wartość.
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Faza mapy
Faza mapy pobiera dane wejściowe z czytnika rekordów, przetwarza je i generuje dane wyjściowe jako kolejny zestaw par klucz-wartość.
Input - Następująca para klucz-wartość jest wejściem pobranym z czytnika rekordów.
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Faza Map odczytuje każdą parę klucz-wartość, oddziela każde słowo od wartości za pomocą StringTokenizer, traktuje każde słowo jako klucz, a liczbę tego słowa jako wartość. Poniższy fragment kodu przedstawia klasę Mapper i funkcję map.
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Output - Oczekiwany wynik jest następujący -
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
Faza łączenia
Faza Combiner pobiera każdą parę klucz-wartość z fazy mapy, przetwarza ją i generuje dane wyjściowe jako key-value collection pary.
Input - Następująca para klucz-wartość to dane wejściowe pobrane z fazy mapy.
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
Faza Combiner odczytuje każdą parę klucz-wartość, łączy popularne słowa jako klucz i wartości jako kolekcję. Zwykle kod i operacja łącznika są podobne do kodu reduktora. Poniżej znajduje się fragment kodu dla deklaracji klas Mapper, Combiner i Reducer.
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
Output - Oczekiwany wynik jest następujący -
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Faza reduktora
Faza Reducer pobiera każdą parę kolekcji klucz-wartość z fazy Combiner, przetwarza ją i przekazuje dane wyjściowe jako pary klucz-wartość. Zauważ, że funkcjonalność Combiner jest taka sama jak Reducer.
Input - Następująca para klucz-wartość jest wejściem pobranym z fazy Combiner.
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Faza reduktora odczytuje każdą parę klucz-wartość. Poniżej znajduje się fragment kodu Combiner.
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Output - Oczekiwana moc wyjściowa z fazy reduktora jest następująca -
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Autor nagrań
Jest to ostatnia faza MapReduce, w której zapisujący rekord zapisuje każdą parę klucz-wartość z fazy Reducer i wysyła dane wyjściowe jako tekst.
Input - Każda para klucz-wartość z fazy reduktora wraz z formatem wyjściowym.
Output- Daje pary klucz-wartość w formacie tekstowym. Poniżej przedstawiono oczekiwany wynik.
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1
Przykładowy program
Poniższy blok kodu zlicza liczbę słów w programie.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Zapisz powyższy program jako WordCount.java. Kompilację i wykonanie programu podano poniżej.
Kompilacja i wykonanie
Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (na przykład / home / hadoop).
Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.
Step 1 - Użyj poniższego polecenia, aby utworzyć katalog do przechowywania skompilowanych klas Java.
$ mkdir units
Step 2- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Możesz pobrać jar ze strony mvnrepository.com .
Załóżmy, że pobrany folder to / home / hadoop /.
Step 3 - Użyj następujących poleceń, aby skompilować plik WordCount.java program i stworzyć słoik dla programu.
$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .
Step 4 - Użyj następującego polecenia, aby utworzyć katalog wejściowy w formacie HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Użyj następującego polecenia, aby skopiować plik wejściowy o nazwie input.txt w katalogu wejściowym HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir
Step 6 - Użyj następującego polecenia, aby zweryfikować pliki w katalogu wejściowym.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Użyj następującego polecenia, aby uruchomić aplikację licznika słów, pobierając pliki wejściowe z katalogu wejściowego.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania i zadań reduktora.
Step 8 - Użyj następującego polecenia, aby zweryfikować pliki wynikowe w folderze wyjściowym.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Użyj następującego polecenia, aby wyświetlić dane wyjściowe w formacie Part-00000plik. Ten plik jest generowany przez HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Poniżej przedstawiono dane wyjściowe wygenerowane przez program MapReduce.
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1
W tym rozdziale opisano administrację Hadoop, która obejmuje administrację HDFS i MapReduce.
Administracja HDFS obejmuje monitorowanie struktury plików HDFS, lokalizacji i zaktualizowanych plików.
Administracja MapReduce obejmuje monitorowanie listy aplikacji, konfigurację węzłów, stan aplikacji itp.
Monitorowanie HDFS
HDFS (Hadoop Distributed File System) zawiera katalogi użytkowników, pliki wejściowe i pliki wyjściowe. Użyj poleceń MapReduce,put i get, do przechowywania i odzyskiwania.
Po uruchomieniu struktury Hadoop (demonów) przez przekazanie polecenia „start-all.sh” na „/ $ HADOOP_HOME / sbin”, przekaż następujący adres URL do przeglądarki „http: // localhost: 50070”. W przeglądarce powinien pojawić się następujący ekran.
Poniższy zrzut ekranu pokazuje, jak przeglądać pliki HDFS.
Poniższy zrzut ekranu przedstawia strukturę plików HDFS. Pokazuje pliki w katalogu „/ user / hadoop”.
Poniższy zrzut ekranu przedstawia informacje o Datanode w klastrze. Tutaj możesz znaleźć jeden węzeł z jego konfiguracjami i możliwościami.
Monitorowanie zadań MapReduce
Aplikacja MapReduce to zbiór zadań (zadanie mapowania, łączenie, partycjonowanie i zmniejszanie). Obowiązkowe jest monitorowanie i utrzymywanie następujących:
- Konfiguracja datanode tam, gdzie aplikacja jest odpowiednia.
- Liczba węzłów danych i zasobów używanych na aplikację.
Aby monitorować wszystkie te rzeczy, musimy mieć interfejs użytkownika. Po uruchomieniu środowiska Hadoop przez przekazanie polecenia „start-all.sh” na „/ $ HADOOP_HOME / sbin”, przekaż następujący adres URL do przeglądarki „http: // localhost: 8080”. W przeglądarce powinien pojawić się następujący ekran.
Na powyższym zrzucie ekranu wskaźnik dłoni znajduje się na identyfikatorze aplikacji. Po prostu kliknij, aby znaleźć następujący ekran w przeglądarce. Opisuje następujące -
Na jakim użytkowniku działa bieżąca aplikacja
Nazwa aplikacji
Rodzaj tej aplikacji
Stan obecny, stan ostateczny
Czas uruchomienia aplikacji, czas, który upłynął (czas zakończenia), jeśli jest kompletny w momencie monitorowania
Historia tej aplikacji, czyli informacje dziennika
I wreszcie informacje o węzłach, czyli węzłach, które uczestniczyły w uruchomieniu aplikacji.
Poniższy zrzut ekranu przedstawia szczegóły konkretnej aplikacji -
Poniższy zrzut ekranu przedstawia informacje o aktualnie uruchomionych węzłach. Tutaj zrzut ekranu zawiera tylko jeden węzeł. Wskaźnik dłoni pokazuje adres hosta lokalnego działającego węzła.