MapReduce - Краткое руководство

MapReduce - это модель программирования для написания приложений, которые могут обрабатывать большие данные параллельно на нескольких узлах. MapReduce предоставляет аналитические возможности для анализа огромных объемов сложных данных.

Что такое большие данные?

Большие данные - это набор больших наборов данных, которые невозможно обработать с помощью традиционных вычислительных технологий. Например, объем данных, которые необходимы Facebook или Youtube для ежедневного сбора и обработки, может подпадать под категорию больших данных. Однако большие данные - это не только масштаб и объем, они также включают один или несколько из следующих аспектов - скорость, разнообразие, объем и сложность.

Почему именно MapReduce?

Традиционные корпоративные системы обычно имеют централизованный сервер для хранения и обработки данных. На следующем рисунке показано схематическое изображение традиционной корпоративной системы. Традиционная модель, безусловно, не подходит для обработки огромных объемов масштабируемых данных и не может быть приспособлена стандартными серверами баз данных. Более того, централизованная система создает слишком много узких мест при одновременной обработке нескольких файлов.

Google решил эту проблему с узким местом с помощью алгоритма MapReduce. MapReduce делит задачу на небольшие части и назначает их множеству компьютеров. Позже результаты собираются в одном месте и объединяются для формирования набора данных результатов.

Как работает MapReduce?

Алгоритм MapReduce содержит две важные задачи, а именно Map и Reduce.

  • Задача «Карта» принимает набор данных и преобразует его в другой набор данных, где отдельные элементы разбиваются на кортежи (пары ключ-значение).

  • Задача Reduce принимает выходные данные Map в качестве входных данных и объединяет эти кортежи данных (пары ключ-значение) в меньший набор кортежей.

Задача сокращения всегда выполняется после задания карты.

Давайте теперь внимательно рассмотрим каждую из фаз и попытаемся понять их значение.

  • Input Phase - Здесь у нас есть средство чтения записей, которое переводит каждую запись во входном файле и отправляет проанализированные данные модулю сопоставления в виде пар ключ-значение.

  • Map - Карта - это определяемая пользователем функция, которая принимает серию пар ключ-значение и обрабатывает каждую из них для создания нуля или более пар ключ-значение.

  • Intermediate Keys - Эти пары ключ-значение, сгенерированные преобразователем, известны как промежуточные ключи.

  • Combiner- Комбайнер - это тип локального редуктора, который группирует похожие данные из фазы карты в идентифицируемые наборы. Он принимает промежуточные ключи от преобразователя в качестве входных данных и применяет определенный пользователем код для агрегирования значений в небольшой области одного преобразователя. Он не является частью основного алгоритма MapReduce; это необязательно.

  • Shuffle and Sort- Задача «Редуктор» начинается с этапа «Перемешать и отсортировать». Он загружает сгруппированные пары ключ-значение на локальный компьютер, на котором работает Reducer. Отдельные пары "ключ-значение" сортируются по ключу в более крупный список данных. Список данных группирует эквивалентные ключи вместе, чтобы их значения можно было легко повторить в задаче Reducer.

  • Reducer- Редуктор принимает сгруппированные парные данные "ключ-значение" в качестве входных данных и запускает функцию редуктора для каждого из них. Здесь данные могут быть агрегированы, отфильтрованы и объединены различными способами, и для этого требуется широкий диапазон обработки. По окончании выполнения на последнем этапе выдается ноль или более пар "ключ-значение".

  • Output Phase - На этапе вывода у нас есть средство форматирования вывода, которое переводит окончательные пары ключ-значение из функции Reducer и записывает их в файл с помощью средства записи.

Давайте попробуем разобраться в двух задачах Map & f Reduce с помощью небольшой диаграммы -

MapReduce-Example

Давайте возьмем реальный пример, чтобы понять возможности MapReduce. Twitter получает около 500 миллионов твитов в день, что составляет почти 3000 твитов в секунду. На следующем рисунке показано, как Tweeter управляет своими твитами с помощью MapReduce.

Как показано на рисунке, алгоритм MapReduce выполняет следующие действия:

  • Tokenize - Токенизирует твиты в карты токенов и записывает их как пары ключ-значение.

  • Filter - Фильтрует нежелательные слова из карт токенов и записывает отфильтрованные карты в виде пар ключ-значение.

  • Count - Создает счетчик токенов на слово.

  • Aggregate Counters - Подготавливает совокупность одинаковых значений счетчиков в небольшие управляемые единицы.

Алгоритм MapReduce содержит две важные задачи, а именно Map и Reduce.

  • Задача карты выполняется с помощью класса Mapper.
  • Задача сокращения выполняется с помощью класса Reducer.

Класс Mapper принимает ввод, токенизирует его, сопоставляет и сортирует. Выходные данные класса Mapper используются в качестве входных данных для класса Reducer, который, в свою очередь, ищет подходящие пары и сокращает их.

MapReduce реализует различные математические алгоритмы для разделения задачи на небольшие части и назначения их нескольким системам. С технической точки зрения алгоритм MapReduce помогает отправлять задачи Map & Reduce на соответствующие серверы в кластере.

Эти математические алгоритмы могут включать следующее:

  • Sorting
  • Searching
  • Indexing
  • TF-IDF

Сортировка

Сортировка - один из основных алгоритмов MapReduce для обработки и анализа данных. MapReduce реализует алгоритм сортировки для автоматической сортировки выходных пар ключ-значение из сопоставителя по их ключам.

  • Методы сортировки реализованы в самом классе mapper.

  • На этапе перемешивания и сортировки после токенизации значений в классе сопоставления Context class (определяемый пользователем класс) собирает соответствующие ключи в виде коллекции.

  • Чтобы собрать похожие пары ключ-значение (промежуточные ключи), класс Mapper использует RawComparator класс для сортировки пар ключ-значение.

  • Набор промежуточных пар "ключ-значение" для данного редуктора автоматически сортируется Hadoop для формирования ключей и значений (K2, {V2, V2,…}), прежде чем они будут представлены редуктору.

Поиск

Поиск играет важную роль в алгоритме MapReduce. Это помогает на этапе объединения (необязательно) и на этапе восстановления. Попробуем разобраться, как работает Searching, на примере.

пример

В следующем примере показано, как MapReduce использует алгоритм поиска, чтобы узнать подробности о сотруднике, получающем самую высокую зарплату в данном наборе данных о сотрудниках.

  • Предположим, у нас есть данные о сотрудниках в четырех разных файлах - A, B, C и D. Предположим также, что во всех четырех файлах есть повторяющиеся записи о сотрудниках из-за многократного импорта данных о сотрудниках из всех таблиц базы данных. См. Следующую иллюстрацию.

  • The Map phaseобрабатывает каждый входной файл и предоставляет данные о сотрудниках в виде пар ключ-значение (<k, v>: <emp name, salary>). См. Следующую иллюстрацию.

  • The combiner phase(метод поиска) примет входные данные с этапа сопоставления в виде пары ключ-значение с именем сотрудника и зарплатой. Используя технику поиска, комбайнер проверит всю заработную плату сотрудников, чтобы найти самого высокооплачиваемого сотрудника в каждом файле. См. Следующий фрагмент.

<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary

if(v(second employee).salary > Max){
   Max = v(salary);
}

else{
   Continue checking;
}

Ожидаемый результат выглядит следующим образом -

<сатиш, 26000>

<гопал, 50000>

<киран, 45000>

<маниша, 45000>

  • Reducer phase- Сформируйте каждый файл, вы найдете самого высокооплачиваемого сотрудника. Чтобы избежать дублирования, проверьте все пары <k, v> и удалите повторяющиеся записи, если таковые имеются. Тот же алгоритм используется между четырьмя парами <k, v>, которые поступают из четырех входных файлов. Окончательный результат должен быть следующим -

<gopal, 50000>

Индексирование

Обычно индексация используется для указания на определенные данные и их адрес. Он выполняет пакетную индексацию входных файлов для конкретного Mapper.

Техника индексирования, которая обычно используется в MapReduce, известна как inverted index.Поисковые системы, такие как Google и Bing, используют метод перевернутой индексации. Попробуем разобраться, как работает индексация, на простом примере.

пример

Следующий текст является вводом для инвертированной индексации. Здесь T [0], T [1] и t [2] - имена файлов, а их содержимое заключено в двойные кавычки.

T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"

После применения алгоритма индексирования мы получаем следующий результат -

"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}

Здесь "a": {2} означает, что термин "a" появляется в файле T [2]. Точно так же "is": {0, 1, 2} подразумевает, что термин "is" появляется в файлах T [0], T [1] и T [2].

TF-IDF

TF-IDF - это алгоритм обработки текста, который является сокращением от Term Frequency - Inverse Document Frequency. Это один из распространенных алгоритмов веб-анализа. Здесь термин «частота» означает, сколько раз термин встречается в документе.

Частота сроков (TF)

Он измеряет, как часто в документе встречается конкретный термин. Он рассчитывается как количество раз, когда слово появляется в документе, деленное на общее количество слов в этом документе.

TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)

Обратная частота документов (IDF)

Он измеряет важность термина. Он рассчитывается путем деления количества документов в текстовой базе данных на количество документов, в которых встречается конкретный термин.

При вычислении TF все термины считаются одинаково важными. Это означает, что TF подсчитывает частоту терминов для обычных слов, таких как «есть», «а», «что» и т. Д. Таким образом, нам нужно знать частые термины, увеличивая масштаб редких, вычисляя следующее

IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).

Алгоритм объясняется ниже на небольшом примере.

пример

Рассмотрим документ, содержащий 1000 слов, в которых слово hiveпоявляется 50 раз. TF дляhive тогда (50/1000) = 0,05.

Теперь предположим, что у нас есть 10 миллионов документов и слово hiveпоявляется в 1000 из них. Тогда IDF рассчитывается как log (10,000,000 / 1,000) = 4.

Вес TF-IDF является произведением этих величин - 0,05 × 4 = 0,20.

MapReduce работает только в операционных системах, адаптированных под Linux, и поставляется со встроенной платформой Hadoop Framework. Нам необходимо выполнить следующие шаги, чтобы установить фреймворк Hadoop.

Проверка установки JAVA

Перед установкой Hadoop в вашей системе должна быть установлена ​​Java. Используйте следующую команду, чтобы проверить, установлена ​​ли в вашей системе Java.

$ java –version

Если Java уже установлена ​​в вашей системе, вы увидите следующий ответ -

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Если в вашей системе не установлена ​​Java, выполните действия, указанные ниже.

Установка Java

Шаг 1

Загрузите последнюю версию Java по следующей ссылке - этой ссылке .

После загрузки вы можете найти файл jdk-7u71-linux-x64.tar.gz в папке "Загрузки".

Шаг 2

Используйте следующие команды для извлечения содержимого jdk-7u71-linux-x64.gz.

$ cd Downloads/
$ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

Шаг 3

Чтобы сделать Java доступной для всех пользователей, вы должны переместить ее в папку «/ usr / local /». Зайдите в root и введите следующие команды -

$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit

Шаг 4

Для настройки переменных PATH и JAVA_HOME добавьте следующие команды в файл ~ / .bashrc.

export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin

Примените все изменения к текущей работающей системе.

$ source ~/.bashrc

Шаг 5

Используйте следующие команды для настройки альтернатив Java -

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2

# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2

# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java

# alternatives --set javac usr/local/java/bin/javac

# alternatives --set jar usr/local/java/bin/jar

Теперь проверьте установку с помощью команды java -version с терминала.

Проверка установки Hadoop

Перед установкой MapReduce в вашей системе должен быть установлен Hadoop. Давайте проверим установку Hadoop, используя следующую команду -

$ hadoop version

Если Hadoop уже установлен в вашей системе, вы получите следующий ответ:

Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

Если Hadoop не установлен в вашей системе, выполните следующие действия.

Скачивание Hadoop

Загрузите Hadoop 2.4.1 с Apache Software Foundation и извлеките его содержимое с помощью следующих команд.

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

Установка Hadoop в псевдораспределенном режиме

Следующие шаги используются для установки Hadoop 2.4.1 в псевдораспределенном режиме.

Шаг 1 - Настройка Hadoop

Вы можете установить переменные среды Hadoop, добавив следующие команды в файл ~ / .bashrc.

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

Примените все изменения к текущей работающей системе.

$ source ~/.bashrc

Шаг 2 - Конфигурация Hadoop

Вы можете найти все файлы конфигурации Hadoop в папке «$ HADOOP_HOME / etc / hadoop». Вам необходимо внести соответствующие изменения в эти файлы конфигурации в соответствии с вашей инфраструктурой Hadoop.

$ cd $HADOOP_HOME/etc/hadoop

Для разработки программ Hadoop с использованием Java необходимо сбросить переменные среды Java в hadoop-env.sh файл, заменив значение JAVA_HOME на расположение Java в вашей системе.

export JAVA_HOME=/usr/local/java

Для настройки Hadoop вам необходимо отредактировать следующие файлы:

  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml
  • mapred-site.xml

core-site.xml

core-site.xml содержит следующую информацию:

  • Номер порта, используемый для экземпляра Hadoop
  • Память, выделенная для файловой системы
  • Ограничение памяти для хранения данных
  • Размер буферов чтения / записи

Откройте core-site.xml и добавьте следующие свойства между тегами <configuration> и </configuration>.

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000 </value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xml содержит следующую информацию -

  • Ценность данных репликации
  • Путь к namenode
  • Путь к datanode ваших локальных файловых систем (место, где вы хотите сохранить Hadoop Infra)

Предположим следующие данные.

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

Откройте этот файл и добавьте следующие свойства между тегами <configuration>, </configuration>.

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>
   
   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>
   
</configuration>

Note - В приведенном выше файле все значения свойств определяются пользователем, и вы можете вносить изменения в соответствии с вашей инфраструктурой Hadoop.

пряжа-site.xml

Этот файл используется для настройки пряжи в Hadoop. Откройте файл yarn-site.xml и добавьте следующие свойства между тегами <configuration>, </configuration>.

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

Этот файл используется для указания используемой структуры MapReduce. По умолчанию Hadoop содержит шаблон yarn-site.xml. Прежде всего, вам нужно скопировать файл из mapred-site.xml.template в файл mapred-site.xml, используя следующую команду.

$ cp mapred-site.xml.template mapred-site.xml

Откройте файл mapred-site.xml и добавьте следующие свойства между тегами <configuration>, </configuration>.

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Проверка установки Hadoop

Следующие шаги используются для проверки установки Hadoop.

Шаг 1 - Настройка узла имени

Настройте namenode с помощью команды «hdfs namenode -format» следующим образом:

$ cd ~ $ hdfs namenode -format

Ожидаемый результат выглядит следующим образом -

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

Шаг 2 - Проверка файлов dfs Hadoop

Выполните следующую команду, чтобы запустить файловую систему Hadoop.

$ start-dfs.sh

Ожидаемый результат выглядит следующим образом -

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

Шаг 3 - Проверка скрипта пряжи

Следующая команда используется для запуска сценария пряжи. Выполнение этой команды запустит ваши демоны пряжи.

$ start-yarn.sh

Ожидаемый результат выглядит следующим образом -

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

Шаг 4 - Доступ к Hadoop в браузере

Номер порта по умолчанию для доступа к Hadoop - 50070. Используйте следующий URL-адрес, чтобы получить услуги Hadoop в своем браузере.

http://localhost:50070/

На следующем снимке экрана показан браузер Hadoop.

Шаг 5 - Проверьте все приложения кластера

Номер порта по умолчанию для доступа ко всем приложениям кластера - 8088. Используйте следующий URL-адрес, чтобы использовать эту службу.

http://localhost:8088/

На следующем снимке экрана показан браузер кластера Hadoop.

В этой главе мы внимательно рассмотрим классы и их методы, которые участвуют в операциях программирования MapReduce. В первую очередь мы сосредоточимся на следующем -

  • Интерфейс JobContext
  • Класс работы
  • Класс картографа
  • Класс редуктора

Интерфейс JobContext

Интерфейс JobContext - это супер-интерфейс для всех классов, который определяет различные задания в MapReduce. Это дает вам доступ только для чтения к заданию, которое предоставляется задачам во время их выполнения.

Ниже приведены подчиненные интерфейсы интерфейса JobContext.

S.No. Подинтерфейс Описание
1. MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Определяет контекст, который передается Mapper.

2. ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Определяет контекст, который передается редуктору.

Класс Job - это основной класс, реализующий интерфейс JobContext.

Класс работы

Класс Job - самый важный класс в API MapReduce. Он позволяет пользователю настраивать задание, отправлять его, контролировать его выполнение и запрашивать состояние. Установленные методы работают только до тех пор, пока задание не будет отправлено, после чего они вызовут исключение IllegalStateException.

Обычно пользователь создает приложение, описывает различные аспекты задания, а затем отправляет задание и отслеживает его выполнение.

Вот пример того, как отправить работу -

// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

Конструкторы

Ниже приводится краткое описание конструктора класса Job.

S.No Сводка конструктора
1 Job()
2 Job(Конфигурация)
3 Job(Конфигурация, строка jobName)

Методы

Некоторые из важных методов класса Job следующие:

S.No Описание метода
1 getJobName()

Указанное пользователем имя задания.

2 getJobState()

Возвращает текущее состояние задания.

3 isComplete()

Проверяет, закончена работа или нет.

4 setInputFormatClass()

Устанавливает InputFormat для задания.

5 setJobName(String name)

Устанавливает указанное пользователем имя задания.

6 setOutputFormatClass()

Устанавливает выходной формат для работы.

7 setMapperClass(Class)

Устанавливает Mapper для работы.

8 setReducerClass(Class)

Устанавливает редуктор для работы.

9 setPartitionerClass(Class)

Устанавливает разделитель для работы.

10 setCombinerClass(Class)

Устанавливает комбайнер для работы.

Класс картографа

Класс Mapper определяет задание Map. Сопоставляет входные пары "ключ-значение" с набором промежуточных пар "ключ-значение". Карты - это отдельные задачи, которые преобразуют входные записи в промежуточные записи. Преобразованные промежуточные записи не обязательно должны быть того же типа, что и входные записи. Данная входная пара может отображаться в ноль или в несколько выходных пар.

Метод

mapэто самый известный метод класса Mapper. Синтаксис определен ниже -

map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

Этот метод вызывается один раз для каждой пары "ключ-значение" в разбиении ввода.

Класс редуктора

Класс Reducer определяет задание Reduce в MapReduce. Он сокращает набор промежуточных значений, имеющих общий ключ, до меньшего набора значений. Реализации Reducer могут получить доступ к Configuration для задания через метод JobContext.getConfiguration (). Редуктор имеет три основных этапа - перемешать, сортировать и уменьшить.

  • Shuffle - Редуктор копирует отсортированный вывод из каждого сопоставителя, используя HTTP по сети.

  • Sort- Фреймворк слиянием сортирует входные данные Reducer по ключам (поскольку разные Mappers могут выводить один и тот же ключ). Фазы перемешивания и сортировки происходят одновременно, т. Е. Во время выборки выходных данных они объединяются.

  • Reduce - На этом этапе метод reduce (Object, Iterable, Context) вызывается для каждого <ключа (набора значений)> в отсортированных входных данных.

Метод

reduceэто самый известный метод класса Reducer. Синтаксис определен ниже -

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

Этот метод вызывается один раз для каждого ключа в коллекции пар ключ-значение.

MapReduce - это платформа, которая используется для написания приложений для надежной обработки огромных объемов данных на больших кластерах обычного оборудования. В этой главе вы познакомитесь с работой MapReduce в среде Hadoop с использованием Java.

Алгоритм MapReduce

Как правило, парадигма MapReduce основана на отправке программ сокращения карты на компьютеры, где находятся фактические данные.

  • Во время выполнения задания MapReduce Hadoop отправляет задачи Map и Reduce на соответствующие серверы в кластере.

  • Платформа управляет всеми деталями передачи данных, такими как выдача задач, проверка выполнения задач и копирование данных по кластеру между узлами.

  • Большая часть вычислений происходит на узлах с данными на локальных дисках, что снижает сетевой трафик.

  • После выполнения данной задачи кластер собирает и сокращает данные, чтобы сформировать соответствующий результат, и отправляет их обратно на сервер Hadoop.

Входы и выходы (перспектива Java)

Платформа MapReduce работает с парами «ключ-значение», то есть она рассматривает входные данные для задания как набор пар «ключ-значение» и создает набор пар «ключ-значение» в качестве выходных данных задания, предположительно различных типов.

Классы ключей и значений должны быть сериализуемыми платформой, и, следовательно, требуется реализовать интерфейс Writable. Кроме того, ключевые классы должны реализовать интерфейс WritableComparable, чтобы упростить сортировку фреймворком.

И входной, и выходной формат задания MapReduce имеют форму пар ключ-значение:

(Вход) <k1, v1> -> map -> <k2, v2> -> reduce -> <k3, v3> (Выход).

Ввод Вывод
карта <k1, v1> список (<k2, v2>)
Уменьшить <k2, список (v2)> список (<k3, v3>)

Реализация MapReduce

В следующей таблице приведены данные о потреблении электроэнергии в организации. Таблица включает ежемесячное потребление электроэнергии и среднегодовое значение за пять лет подряд.

Янв Фев Мар Апр май Июн Июл Авг Сен Октябрь Ноя Декабрь Средн.
1979 г. 23 23 2 43 год 24 25 26 26 26 26 25 26 25
1980 г. 26 27 28 28 28 30 31 год 31 год 31 год 30 30 30 29
1981 г. 31 год 32 32 32 33 34 35 год 36 36 34 34 34 34
1984 39 38 39 39 39 41 год 42 43 год 40 39 38 38 40
1985 г. 38 39 39 39 39 41 год 41 год 41 год 00 40 39 39 45

Нам нужно написать приложения для обработки входных данных в данной таблице, чтобы найти год максимального использования, год минимального использования и так далее. Эта задача проста для программистов с ограниченным количеством записей, поскольку они просто напишут логику для получения требуемого вывода и передадут данные в написанное приложение.

Давайте теперь увеличим масштаб входных данных. Предположим, нам необходимо проанализировать потребление электроэнергии всеми крупными отраслями промышленности конкретного государства. Когда мы пишем приложения для обработки таких массивов данных,

  • На их выполнение уйдет много времени.

  • Когда мы перемещаем данные из источника на сетевой сервер, будет интенсивный сетевой трафик.

Для решения этих проблем у нас есть фреймворк MapReduce.

Входные данные

Приведенные выше данные сохраняются как sample.txtи дан как вход. Входной файл выглядит так, как показано ниже.

1979 г. 23 23 2 43 год 24 25 26 26 26 26 25 26 25
1980 г. 26 27 28 28 28 30 31 год 31 год 31 год 30 30 30 29
1981 г. 31 год 32 32 32 33 34 35 год 36 36 34 34 34 34
1984 39 38 39 39 39 41 год 42 43 год 40 39 38 38 40
1985 г. 38 39 39 39 39 41 год 41 год 41 год 00 40 39 39 45

Пример программы

Следующая программа для демонстрационных данных использует платформу MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Сохраните указанную выше программу в ProcessUnits.java. Компиляция и выполнение программы приведены ниже.

Составление и выполнение программы ProcessUnits

Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).

Следуйте инструкциям ниже, чтобы скомпилировать и выполнить указанную выше программу.

Step 1 - Используйте следующую команду, чтобы создать каталог для хранения скомпилированных классов Java.

$ mkdir units

Step 2- Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и выполнения программы MapReduce. Загрузите банку с mvnrepository.com . Предположим, что папка загрузки - / home / hadoop /.

Step 3 - Следующие команды используются для компиляции ProcessUnits.java программу и создать банку для программы.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - Следующая команда используется для создания входного каталога в HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Следующая команда используется для копирования входного файла с именем sample.txt во входном каталоге HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - Следующая команда используется для проверки файлов во входном каталоге

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Следующая команда используется для запуска приложения Eleunit_max путем получения входных файлов из входного каталога.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Подождите, пока файл не запустится. После выполнения вывод содержит несколько входных разделений, задач карты, задач редуктора и т. Д.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - Следующая команда используется для проверки результирующих файлов в выходной папке.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Следующая команда используется для просмотра вывода в Part-00000файл. Этот файл создается HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Ниже приведен вывод, созданный программой MapReduce:

1981 г. 34
1984 40
1985 г. 45

Step 10 - Следующая команда используется для копирования выходной папки из HDFS в локальную файловую систему.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop

Разделитель работает как условие при обработке входного набора данных. Фаза разделения происходит после фазы карты и перед фазой сокращения.

Количество разделителей равно количеству редукторов. Это означает, что разделитель разделит данные по количеству редукторов. Следовательно, данные, передаваемые из одного модуля разделения, обрабатываются одним редуктором.

Разделитель

Разделитель разделяет пары ключ-значение промежуточных выходов карты. Он разделяет данные с использованием определяемого пользователем условия, которое работает как хеш-функция. Общее количество разделов совпадает с количеством задач Reducer для задания. Давайте рассмотрим пример, чтобы понять, как работает разделитель.

Реализация MapReduce Partitioner

Для удобства предположим, что у нас есть небольшая таблица с именем Employee со следующими данными. Мы будем использовать этот образец данных в качестве входного набора данных, чтобы продемонстрировать, как работает секционер.

Я бы имя Возраст Пол Зарплата
1201 гопал 45 мужчина 50 000
1202 Manisha 40 женский 50 000
1203 Халил 34 мужчина 30 000
1204 прасант 30 мужчина 30 000
1205 Киран 20 мужчина 40 000
1206 Laxmi 25 женский 35 000
1207 бхавья 20 женский 15 000
1208 решма 19 женский 15 000
1209 Кранти 22 мужчина 22 000
1210 Сатиш 24 мужчина 25 000
1211 Кришна 25 мужчина 25 000
1212 Аршад 28 мужчина 20 000
1213 Лаванья 18 женский 8 000

Мы должны написать приложение для обработки входного набора данных, чтобы найти самого высокооплачиваемого сотрудника по полу в разных возрастных группах (например, ниже 20, от 21 до 30, старше 30).

Входные данные

Приведенные выше данные сохраняются как input.txt в каталоге «/ home / hadoop / hadoopPartitioner» и указан как входной.

1201 гопал 45 мужчина 50000
1202 Manisha 40 женский 51000
1203 Халил 34 мужчина 30000
1204 прасант 30 мужчина 31000
1205 Киран 20 мужчина 40000
1206 Laxmi 25 женский 35000
1207 бхавья 20 женский 15000
1208 решма 19 женский 14000
1209 Кранти 22 мужчина 22000
1210 Сатиш 24 мужчина 25000
1211 Кришна 25 мужчина 26000
1212 Аршад 28 мужчина 20000
1213 Лаванья 18 женский 8000

На основе введенных данных ниже приводится алгоритмическое объяснение программы.

Задачи карты

Задача карты принимает пары ключ-значение в качестве входных данных, пока у нас есть текстовые данные в текстовом файле. Вход для этой задачи карты следующий:

Input - Ключом будет шаблон, такой как «любой специальный ключ + имя файла + номер строки» (пример: key = @ input1), а значением будут данные в этой строке (пример: value = 1201 \ t gopal \ t 45 \ t Male \ t 50000).

Method - Работа этой задачи карты выглядит следующим образом -

  • Прочтите value (данные записи), который поступает как входное значение из списка аргументов в строке.

  • Используя функцию разделения, разделите пол и сохраните в строковой переменной.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Отправить информацию о поле и данные записи value как выходную пару "ключ-значение" из задачи карты в partition task.

context.write(new Text(gender), new Text(value));
  • Повторите все вышеперечисленные шаги для всех записей в текстовом файле.

Output - Вы получите данные о поле и значении данных в виде пар "ключ-значение".

Задача разделителя

Задача разделителя принимает пары ключ-значение из задачи карты в качестве входных данных. Разделение подразумевает разделение данных на сегменты. В соответствии с заданными условными критериями разделов входные парные данные "ключ-значение" можно разделить на три части на основе критериев возраста.

Input - Все данные в наборе пар "ключ-значение".

key = значение поля Gender в записи.

value = Значение данных всей записи этого пола.

Method - Процесс логики разбиения выполняется следующим образом.

  • Считайте значение поля возраста из входной пары "ключ-значение".
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Проверьте значение возраста при следующих условиях.

    • Возраст меньше или равен 20
    • Возраст от 20 до 30 лет.
    • Возраст старше 30 лет.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Все данные пар "ключ-значение" сегментируются на три коллекции пар "ключ-значение". Редуктор работает индивидуально с каждой коллекцией.

Сократить количество задач

Количество задач разделителя равно количеству задач редуктора. Здесь у нас есть три задачи разделителя, и, следовательно, у нас есть три задачи редуктора, которые нужно выполнить.

Input - Редуктор будет выполняться три раза с разным набором пар ключ-значение.

ключ = значение поля пола в записи.

значение = все данные записи этого пола.

Method - К каждой коллекции будет применяться следующая логика.

  • Прочтите значение поля Salary каждой записи.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Проверьте зарплату с помощью переменной max. Если str [4] - это максимальная зарплата, тогда присвойте str [4] max, иначе пропустите шаг.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Повторите шаги 1 и 2 для каждой передачи ключей (передача ключей осуществляется для мужчин и женщин). После выполнения этих трех шагов вы найдете одну максимальную зарплату при выдаче мужских ключей и одну максимальную зарплату при выдаче женских ключей.

context.write(new Text(key), new IntWritable(max));

Output- Наконец, вы получите набор данных пары ключ-значение в трех коллекциях для разных возрастных групп. Он содержит максимальную зарплату из мужской коллекции и максимальную зарплату из женской коллекции в каждой возрастной группе соответственно.

После выполнения задач Map, Partitioner и Reduce три коллекции данных пары ключ-значение сохраняются в трех разных файлах в качестве выходных данных.

Все три задачи рассматриваются как задания MapReduce. Следующие требования и спецификации этих работ должны быть указаны в конфигурациях:

  • Название работы
  • Форматы ввода и вывода ключей и значений
  • Индивидуальные классы для задач Map, Reduce и Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Пример программы

Следующая программа показывает, как реализовать разделители для заданных критериев в программе MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Сохраните приведенный выше код как PartitionerExample.javaв «/ home / hadoop / hadoopPartitioner». Компиляция и выполнение программы приведены ниже.

Компиляция и исполнение

Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).

Следуйте инструкциям ниже, чтобы скомпилировать и выполнить указанную выше программу.

Step 1- Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и выполнения программы MapReduce. Вы можете скачать банку с сайта mvnrepository.com .

Предположим, что загруженная папка - «/ home / hadoop / hadoopPartitioner».

Step 2 - Следующие команды используются для компиляции программы PartitionerExample.java и создание баночки для программы.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .

Step 3 - Используйте следующую команду для создания входного каталога в HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Используйте следующую команду, чтобы скопировать входной файл с именем input.txt во входном каталоге HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Используйте следующую команду для проверки файлов во входном каталоге.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Используйте следующую команду для запуска приложения Top salary, взяв входные файлы из входного каталога.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Подождите, пока файл не запустится. После выполнения выходные данные содержат несколько входных разделений, задач карты и задач Reducer.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Используйте следующую команду, чтобы проверить полученные файлы в выходной папке.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Вы найдете вывод в трех файлах, потому что вы используете в своей программе три модуля разметки и три редуктора.

Step 8 - Используйте следующую команду, чтобы увидеть вывод в Part-00000файл. Этот файл создается HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Используйте следующую команду, чтобы увидеть вывод в Part-00001 файл.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Используйте следующую команду, чтобы увидеть вывод в Part-00002 файл.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000

Комбайнер, также известный как semi-reducer, - это необязательный класс, который принимает входные данные от класса Map и после этого передает выходные пары ключ-значение классу Reducer.

Основная функция Combiner состоит в том, чтобы суммировать выходные записи карты с одним и тем же ключом. Выходные данные (коллекция «ключ-значение») объединителя будут отправлены по сети в фактическую задачу Reducer в качестве входных данных.

Комбайнер

Класс Combiner используется между классом Map и классом Reduce для уменьшения объема передачи данных между Map и Reduce. Обычно вывод задачи карты большой, а объем данных, переданных в задачу сокращения, велик.

На следующей диаграмме задач MapReduce показана ФАЗА КОМБИНЕРА.

Как работает комбайнер?

Вот краткое описание того, как работает MapReduce Combiner -

  • У комбайнера нет предопределенного интерфейса, и он должен реализовывать метод reduce () интерфейса Reducer.

  • Комбайнер работает с каждым ключом вывода карты. Он должен иметь те же выходные типы "ключ-значение", что и класс Reducer.

  • Объединитель может создавать сводную информацию из большого набора данных, поскольку он заменяет исходный вывод карты.

Хотя Combiner является необязательным, но он помогает разделить данные на несколько групп для фазы сокращения, что упрощает обработку.

Реализация MapReduce Combiner

Следующий пример дает теоретическое представление о комбайнерах. Предположим, у нас есть следующий входной текстовый файл с именемinput.txt для MapReduce.

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

Важные этапы программы MapReduce с Combiner обсуждаются ниже.

Читатель Записи

Это первая фаза MapReduce, на которой средство чтения записей считывает каждую строку из входного текстового файла как текст и выдает выходные данные в виде пар ключ-значение.

Input - Построчно текст из входного файла.

Output- Формирует пары "ключ-значение". Ниже приводится набор ожидаемых пар "ключ-значение".

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Фаза карты

На этапе сопоставления вводятся данные от средства чтения записей, обрабатываются и выводятся в виде еще одного набора пар ключ-значение.

Input - Следующая пара "ключ-значение" - это ввод, полученный из средства чтения записей.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Фаза сопоставления считывает каждую пару «ключ-значение», разделяет каждое слово от значения с помощью StringTokenizer, обрабатывает каждое слово как ключ, а счетчик этого слова как значение. В следующем фрагменте кода показаны класс Mapper и функция карты.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

Output - Ожидаемый результат следующий -

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Фаза сумматора

Фаза объединения берет каждую пару ключ-значение из фазы сопоставления, обрабатывает ее и выдает результат в виде key-value collection пары.

Input - Следующая пара "ключ-значение" - это входные данные, взятые на этапе сопоставления.

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Фаза объединения считывает каждую пару ключ-значение, объединяет общие слова как ключ и значения как коллекцию. Обычно код и работа Combiner аналогичны программе Reducer. Ниже приведен фрагмент кода для объявления классов Mapper, Combiner и Reducer.

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

Output - Ожидаемый результат следующий -

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Фаза редуктора

Этап редуктора берет каждую пару коллекции ключ-значение из фазы объединения, обрабатывает ее и передает выходные данные в виде пар ключ-значение. Обратите внимание, что функциональность Combiner такая же, как у Reducer.

Input - Следующая пара "ключ-значение" - это входные данные, взятые на этапе объединения.

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Фаза редуктора считывает каждую пару "ключ-значение". Ниже приведен фрагмент кода для Combiner.

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

Output - Ожидаемый результат фазы редуктора следующий:

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Рекорд писатель

Это последний этап MapReduce, на котором средство записи записи записывает каждую пару ключ-значение из этапа редуктора и отправляет результат в виде текста.

Input - Каждая пара "ключ-значение" из этапа "Редуктор" вместе с форматом вывода.

Output- Он дает вам пары ключ-значение в текстовом формате. Ниже приводится ожидаемый результат.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

Пример программы

Следующий блок кода подсчитывает количество слов в программе.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Сохраните указанную выше программу как WordCount.java. Компиляция и выполнение программы приведены ниже.

Компиляция и исполнение

Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).

Следуйте инструкциям ниже, чтобы скомпилировать и выполнить указанную выше программу.

Step 1 - Используйте следующую команду, чтобы создать каталог для хранения скомпилированных классов Java.

$ mkdir units

Step 2- Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и выполнения программы MapReduce. Вы можете скачать банку с сайта mvnrepository.com .

Предположим, что загруженная папка - / home / hadoop /.

Step 3 - Используйте следующие команды для компиляции WordCount.java программу и создать банку для программы.

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

Step 4 - Используйте следующую команду для создания входного каталога в HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - Используйте следующую команду, чтобы скопировать входной файл с именем input.txt во входном каталоге HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

Step 6 - Используйте следующую команду для проверки файлов во входном каталоге.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - Используйте следующую команду, чтобы запустить приложение подсчета слов, взяв входные файлы из входного каталога.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Подождите, пока файл не запустится. После выполнения выходные данные содержат несколько входных разделений, задач карты и задач редуктора.

Step 8 - Используйте следующую команду, чтобы проверить полученные файлы в выходной папке.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - Используйте следующую команду, чтобы увидеть вывод в Part-00000файл. Этот файл создается HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Ниже приводится вывод, созданный программой MapReduce.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

В этой главе описывается администрирование Hadoop, которое включает администрирование как HDFS, так и MapReduce.

  • Администрирование HDFS включает в себя мониторинг файловой структуры HDFS, местоположений и обновленных файлов.

  • Администрирование MapReduce включает в себя мониторинг списка приложений, конфигурации узлов, статуса приложений и т. Д.

Мониторинг HDFS

HDFS (распределенная файловая система Hadoop) содержит пользовательские каталоги, входные и выходные файлы. Используйте команды MapReduce,put и get, для хранения и извлечения.

После запуска инфраструктуры Hadoop (демонов) путем передачи команды «start-all.sh» в «/ $ HADOOP_HOME / sbin» передайте в браузер следующий URL-адрес «http: // localhost: 50070». Вы должны увидеть следующий экран в своем браузере.

На следующем снимке экрана показано, как просматривать HDFS.

На следующем снимке экрана показана файловая структура HDFS. Он показывает файлы в каталоге «/ user / hadoop».

На следующем снимке экрана показана информация Datanode в кластере. Здесь вы можете найти один узел с его конфигурациями и возможностями.

Мониторинг заданий MapReduce

Приложение MapReduce - это набор заданий (задание карты, объединитель, разделитель и задание сокращения). Обязательно контролировать и поддерживать следующее:

  • Конфигурация узла данных, на котором подходит приложение.
  • Количество узлов данных и ресурсов, используемых для каждого приложения.

Чтобы контролировать все эти вещи, обязательно должен быть пользовательский интерфейс. После запуска инфраструктуры Hadoop путем передачи команды «start-all.sh» в «/ $ HADOOP_HOME / sbin» передайте в браузер следующий URL-адрес «http: // localhost: 8080». Вы должны увидеть следующий экран в своем браузере.

На приведенном выше снимке экрана указатель руки находится на идентификаторе приложения. Просто щелкните по нему, чтобы открыть в браузере следующий экран. Он описывает следующее -

  • На каком пользователе запущено текущее приложение

  • Название приложения

  • Тип этого приложения

  • Текущий статус, Окончательный статус

  • Время запуска приложения, прошедшее (время завершения), если оно завершено на момент мониторинга

  • История этого приложения, то есть информация журнала

  • И, наконец, информация об узлах, т. Е. Узлах, участвовавших в запуске приложения.

На следующем снимке экрана показаны детали конкретного приложения -

На следующем снимке экрана показана информация о текущих запущенных узлах. Здесь на скриншоте только один узел. Указатель в виде руки показывает адрес локального хоста работающего узла.