MapReduce - implementacja Hadoop
MapReduce to framework, który jest używany do pisania aplikacji w celu niezawodnego przetwarzania ogromnych ilości danych na dużych klastrach standardowego sprzętu. Ten rozdział przedstawia działanie MapReduce w środowisku Hadoop przy użyciu języka Java.
Algorytm MapReduce
Zasadniczo paradygmat MapReduce polega na wysyłaniu programów zmniejszających mapy do komputerów, na których znajdują się rzeczywiste dane.
Podczas zadania MapReduce Hadoop wysyła zadania Map i Reduce do odpowiednich serwerów w klastrze.
Struktura zarządza wszystkimi szczegółami przekazywania danych, takimi jak wydawanie zadań, weryfikacja wykonania zadań i kopiowanie danych w obrębie klastra między węzłami.
Większość obliczeń odbywa się w węzłach z danymi na dyskach lokalnych, co zmniejsza ruch w sieci.
Po wykonaniu danego zadania klaster zbiera i redukuje dane do odpowiedniego wyniku i odsyła je z powrotem na serwer Hadoop.
Dane wejściowe i wyjściowe (perspektywa Java)
Struktura MapReduce działa na parach klucz-wartość, co oznacza, że struktura wyświetla dane wejściowe zadania jako zestaw par klucz-wartość i tworzy zestaw par klucz-wartość jako dane wyjściowe zadania, prawdopodobnie różnych typów.
Klasy klucza i wartości muszą być możliwe do serializacji przez platformę i dlatego wymagane jest zaimplementowanie interfejsu Writable. Ponadto klasy kluczy muszą implementować interfejs WritableComparable, aby ułatwić sortowanie według struktury.
Zarówno format wejściowy, jak i wyjściowy zadania MapReduce mają postać par klucz-wartość -
(Wejście) <k1, v1> -> map -> <k2, v2> -> zredukuj -> <k3, v3> (wyjście).
Wejście | Wynik | |
---|---|---|
Mapa | <k1, v1> | lista (<k2, v2>) |
Zmniejszyć | <k2, list (v2)> | lista (<k3, v3>) |
Implementacja MapReduce
Poniższa tabela przedstawia dane dotyczące zużycia energii elektrycznej przez organizację. Tabela zawiera miesięczne zużycie energii elektrycznej oraz średnią roczną z pięciu kolejnych lat.
Jan | Luty | Zniszczyć | Kwi | Może | Jun | Lip | Sie | Wrz | Paź | Lis | Dec | Śr | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Musimy napisać aplikacje do przetwarzania danych wejściowych w podanej tabeli, aby znaleźć rok maksymalnego wykorzystania, rok minimalnego użycia i tak dalej. Zadanie to jest łatwe dla programistów ze skończoną liczbą rekordów, ponieważ po prostu napiszą logikę, aby wygenerować wymagane dane wyjściowe, i przekażą dane do napisanej aplikacji.
Podnieśmy teraz skalę danych wejściowych. Załóżmy, że musimy przeanalizować zużycie energii elektrycznej we wszystkich gałęziach przemysłu na dużą skalę w danym stanie. Kiedy piszemy aplikacje do przetwarzania takich danych zbiorczych,
Wykonanie ich zajmie dużo czasu.
Podczas przenoszenia danych ze źródła na serwer sieciowy będzie duży ruch w sieci.
Aby rozwiązać te problemy, mamy strukturę MapReduce.
Dane wejściowe
Powyższe dane są zapisywane jako sample.txti podane jako dane wejściowe. Plik wejściowy wygląda tak, jak pokazano poniżej.
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
Przykładowy program
Poniższy program dla przykładowych danych używa struktury MapReduce.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable, /*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()){
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements
Reducer< Text, IntWritable, Text, IntWritable >
{
//Reduce function
public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(Eleunits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Zapisz powyższy program w ProcessUnits.java. Kompilację i wykonanie programu podano poniżej.
Kompilacja i wykonanie programu ProcessUnits
Załóżmy, że znajdujemy się w katalogu domowym użytkownika Hadoop (np. / Home / hadoop).
Postępuj zgodnie z instrukcjami podanymi poniżej, aby skompilować i uruchomić powyższy program.
Step 1 - Użyj następującego polecenia, aby utworzyć katalog do przechowywania skompilowanych klas Java.
$ mkdir units
Step 2- Pobierz Hadoop-core-1.2.1.jar, który jest używany do kompilowania i wykonywania programu MapReduce. Pobierz jar ze strony mvnrepository.com . Załóżmy, że folder pobierania to / home / hadoop /.
Step 3 - Poniższe polecenia służą do kompilowania pliku ProcessUnits.java program i stworzyć słoik dla programu.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4 - Następujące polecenie służy do tworzenia katalogu wejściowego w formacie HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5 - Następujące polecenie służy do kopiowania pliku wejściowego o nazwie sample.txt w katalogu wejściowym HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6 - Następujące polecenie służy do weryfikacji plików w katalogu wejściowym
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7 - Następujące polecenie służy do uruchamiania aplikacji Eleunit_max poprzez pobieranie plików wejściowych z katalogu wejściowego.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
Poczekaj chwilę, aż plik zostanie wykonany. Po wykonaniu dane wyjściowe zawierają szereg podziałów danych wejściowych, zadań mapowania, zadań reduktora itp.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
Step 8 - Następujące polecenie służy do weryfikacji plików wynikowych w folderze wyjściowym.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9 - Następujące polecenie służy do wyświetlania danych wyjściowych w formacie Part-00000plik. Ten plik jest generowany przez HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Poniżej przedstawiono dane wyjściowe wygenerowane przez program MapReduce -
1981 | 34 |
1984 | 40 |
1985 | 45 |
Step 10 - Następujące polecenie służy do kopiowania folderu wyjściowego z HDFS do lokalnego systemu plików.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop