Hadoop - Streaming
Streaming Hadoop adalah utilitas yang disertakan dengan distribusi Hadoop. Utilitas ini memungkinkan Anda untuk membuat dan menjalankan pekerjaan Map / Reduce dengan eksekusi atau skrip apa pun sebagai pemeta dan / atau peredam.
Contoh Menggunakan Python
Untuk streaming Hadoop, kami mempertimbangkan masalah jumlah kata. Setiap pekerjaan di Hadoop harus memiliki dua fase: mapper dan reducer. Kami telah menulis kode untuk mapper dan reducer dalam skrip python untuk menjalankannya di bawah Hadoop. Seseorang juga dapat menulis yang sama di Perl dan Ruby.
Kode Fase Mapper
!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
Pastikan file ini memiliki izin eksekusi (chmod + x / home / expert / hadoop-1.2.1 / mapper.py).
Kode Fase Peredam
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('\t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Simpan kode mapper dan reducer di mapper.py dan reducer.py di direktori home Hadoop. Pastikan file-file ini memiliki izin eksekusi (chmod + x mapper.py dan chmod + x reducer.py). Karena python peka indentasi sehingga kode yang sama dapat diunduh dari tautan di bawah ini.
Eksekusi Program WordCount
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
-input input_dirs \
-output output_dir \
-mapper <path/mapper.py \
-reducer <path/reducer.py
Di mana "\" digunakan untuk kelanjutan baris agar dapat dibaca dengan jelas.
Sebagai contoh,
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
Cara Kerja Streaming
Dalam contoh di atas, baik mapper dan reducer adalah skrip python yang membaca masukan dari masukan standar dan memancarkan keluaran ke keluaran standar. Utilitas akan membuat pekerjaan Map / Reduce, mengirimkan pekerjaan ke cluster yang sesuai, dan memantau kemajuan pekerjaan hingga selesai.
Ketika skrip ditentukan untuk pembuat peta, setiap tugas pembuat peta akan meluncurkan skrip sebagai proses terpisah saat pembuat peta diinisialisasi. Saat tugas mapper berjalan, ia mengubah masukannya menjadi garis dan memberi makan baris tersebut ke masukan standar (STDIN) dari proses tersebut. Sementara itu, pembuat peta mengumpulkan keluaran berorientasi garis dari keluaran standar (STDOUT) dari proses dan mengubah setiap baris menjadi pasangan kunci / nilai, yang dikumpulkan sebagai keluaran dari mapper. Secara default, awalan baris hingga karakter tab pertama adalah kunci dan sisa baris (tidak termasuk karakter tab) akan menjadi nilainya. Jika tidak ada karakter tab di baris, maka seluruh baris dianggap sebagai kunci dan nilainya null. Namun, ini dapat disesuaikan, sesuai kebutuhan.
Saat skrip ditentukan untuk peredam, setiap tugas peredam akan meluncurkan skrip sebagai proses terpisah, kemudian peredam diinisialisasi. Saat tugas peredam berjalan, ia mengubah pasangan kunci / nilai masukannya menjadi garis dan memasukkan garis tersebut ke masukan standar (STDIN) dari proses tersebut. Sementara itu, peredam mengumpulkan keluaran berorientasi garis dari keluaran standar (STDOUT) proses, mengubah setiap baris menjadi pasangan kunci / nilai, yang dikumpulkan sebagai keluaran peredam. Secara default, awalan baris hingga karakter tab pertama adalah kunci dan sisa baris (tidak termasuk karakter tab) adalah nilainya. Namun, ini dapat disesuaikan sesuai kebutuhan spesifik.
Perintah Penting
Parameter | Pilihan | Deskripsi |
---|---|---|
-input direktori / nama-file | Yg dibutuhkan | Lokasi input untuk mapper. |
nama direktori -output | Yg dibutuhkan | Lokasi keluaran untuk peredam. |
-mapper dapat dieksekusi atau script atau JavaClassName | Yg dibutuhkan | Mapper dapat dieksekusi. |
-reducer dapat dieksekusi atau script atau JavaClassName | Yg dibutuhkan | Peredam dapat dieksekusi. |
-nama file file | Pilihan | Membuat pembuat peta, peredam, atau penggabung dapat dieksekusi tersedia secara lokal di node komputasi. |
-inputformat JavaClassName | Pilihan | Kelas yang Anda berikan harus mengembalikan pasangan kunci / nilai kelas Teks. Jika tidak ditentukan, TextInputFormat akan digunakan sebagai default. |
-outputformat JavaClassName | Pilihan | Kelas yang Anda berikan harus mengambil pasangan kunci / nilai dari kelas Teks. Jika tidak ditentukan, TextOutputformat digunakan sebagai default. |
-partitioner JavaClassName | Pilihan | Kelas yang menentukan pengurangan kunci mana yang dikirim. |
-combiner streamingCommand atau JavaClassName | Pilihan | Combiner dapat dieksekusi untuk keluaran peta. |
-cmdenv name = nilai | Pilihan | Meneruskan variabel lingkungan ke perintah streaming. |
-inputreader | Pilihan | Untuk kompatibilitas mundur: menentukan kelas pembaca rekaman (bukan kelas format input). |
-verbose | Pilihan | Keluaran panjang. |
-lazyOutput | Pilihan | Membuat keluaran dengan malas. Misalnya, jika format output didasarkan pada FileOutputFormat, file output dibuat hanya pada panggilan pertama ke output.collect (atau Context.write). |
-numReduceTasks | Pilihan | Menentukan jumlah reduksi. |
-mapdebug | Pilihan | Script untuk dipanggil ketika tugas peta gagal. |
-reducedebug | Pilihan | Script untuk dipanggil saat pengurangan tugas gagal. |