Hadoop - Streaming

Streaming Hadoop adalah utilitas yang disertakan dengan distribusi Hadoop. Utilitas ini memungkinkan Anda untuk membuat dan menjalankan pekerjaan Map / Reduce dengan eksekusi atau skrip apa pun sebagai pemeta dan / atau peredam.

Contoh Menggunakan Python

Untuk streaming Hadoop, kami mempertimbangkan masalah jumlah kata. Setiap pekerjaan di Hadoop harus memiliki dua fase: mapper dan reducer. Kami telah menulis kode untuk mapper dan reducer dalam skrip python untuk menjalankannya di bawah Hadoop. Seseorang juga dapat menulis yang sama di Perl dan Ruby.

Kode Fase Mapper

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

Pastikan file ini memiliki izin eksekusi (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).

Kode Fase Peredam

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

Simpan kode mapper dan reducer di mapper.py dan reducer.py di direktori home Hadoop. Pastikan file-file ini memiliki izin eksekusi (chmod + x mapper.py dan chmod + x reducer.py). Karena python peka indentasi sehingga kode yang sama dapat diunduh dari tautan di bawah ini.

Eksekusi Program WordCount

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

Di mana "\" digunakan untuk kelanjutan baris agar dapat dibaca dengan jelas.

Sebagai contoh,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

Cara Kerja Streaming

Dalam contoh di atas, baik mapper dan reducer adalah skrip python yang membaca masukan dari masukan standar dan memancarkan keluaran ke keluaran standar. Utilitas akan membuat pekerjaan Map / Reduce, mengirimkan pekerjaan ke cluster yang sesuai, dan memantau kemajuan pekerjaan hingga selesai.

Ketika skrip ditentukan untuk pembuat peta, setiap tugas pembuat peta akan meluncurkan skrip sebagai proses terpisah saat pembuat peta diinisialisasi. Saat tugas mapper berjalan, ia mengubah masukannya menjadi garis dan memberi makan baris tersebut ke masukan standar (STDIN) dari proses tersebut. Sementara itu, pembuat peta mengumpulkan keluaran berorientasi garis dari keluaran standar (STDOUT) dari proses dan mengubah setiap baris menjadi pasangan kunci / nilai, yang dikumpulkan sebagai keluaran dari mapper. Secara default, awalan baris hingga karakter tab pertama adalah kunci dan sisa baris (tidak termasuk karakter tab) akan menjadi nilainya. Jika tidak ada karakter tab di baris, maka seluruh baris dianggap sebagai kunci dan nilainya null. Namun, ini dapat disesuaikan, sesuai kebutuhan.

Saat skrip ditentukan untuk peredam, setiap tugas peredam akan meluncurkan skrip sebagai proses terpisah, kemudian peredam diinisialisasi. Saat tugas peredam berjalan, ia mengubah pasangan kunci / nilai masukannya menjadi garis dan memasukkan garis tersebut ke masukan standar (STDIN) dari proses tersebut. Sementara itu, peredam mengumpulkan keluaran berorientasi garis dari keluaran standar (STDOUT) proses, mengubah setiap baris menjadi pasangan kunci / nilai, yang dikumpulkan sebagai keluaran peredam. Secara default, awalan baris hingga karakter tab pertama adalah kunci dan sisa baris (tidak termasuk karakter tab) adalah nilainya. Namun, ini dapat disesuaikan sesuai kebutuhan spesifik.

Perintah Penting

Parameter Pilihan Deskripsi
-input direktori / nama-file Yg dibutuhkan Lokasi input untuk mapper.
nama direktori -output Yg dibutuhkan Lokasi keluaran untuk peredam.
-mapper dapat dieksekusi atau script atau JavaClassName Yg dibutuhkan Mapper dapat dieksekusi.
-reducer dapat dieksekusi atau script atau JavaClassName Yg dibutuhkan Peredam dapat dieksekusi.
-nama file file Pilihan Membuat pembuat peta, peredam, atau penggabung dapat dieksekusi tersedia secara lokal di node komputasi.
-inputformat JavaClassName Pilihan Kelas yang Anda berikan harus mengembalikan pasangan kunci / nilai kelas Teks. Jika tidak ditentukan, TextInputFormat akan digunakan sebagai default.
-outputformat JavaClassName Pilihan Kelas yang Anda berikan harus mengambil pasangan kunci / nilai dari kelas Teks. Jika tidak ditentukan, TextOutputformat digunakan sebagai default.
-partitioner JavaClassName Pilihan Kelas yang menentukan pengurangan kunci mana yang dikirim.
-combiner streamingCommand atau JavaClassName Pilihan Combiner dapat dieksekusi untuk keluaran peta.
-cmdenv name = nilai Pilihan Meneruskan variabel lingkungan ke perintah streaming.
-inputreader Pilihan Untuk kompatibilitas mundur: menentukan kelas pembaca rekaman (bukan kelas format input).
-verbose Pilihan Keluaran panjang.
-lazyOutput Pilihan Membuat keluaran dengan malas. Misalnya, jika format output didasarkan pada FileOutputFormat, file output dibuat hanya pada panggilan pertama ke output.collect (atau Context.write).
-numReduceTasks Pilihan Menentukan jumlah reduksi.
-mapdebug Pilihan Script untuk dipanggil ketika tugas peta gagal.
-reducedebug Pilihan Script untuk dipanggil saat pengurangan tugas gagal.