Apache Flume - Tìm nạp dữ liệu Twitter

Sử dụng Flume, chúng tôi có thể tìm nạp dữ liệu từ các dịch vụ khác nhau và vận chuyển nó đến các cửa hàng tập trung (HDFS và HBase). Chương này giải thích cách tìm nạp dữ liệu từ dịch vụ Twitter và lưu trữ nó trong HDFS bằng Apache Flume.

Như đã thảo luận trong Kiến trúc Flume, một máy chủ web tạo ra dữ liệu nhật ký và dữ liệu này được thu thập bởi một tác nhân trong Flume. Kênh đệm dữ liệu này vào một bồn rửa, cuối cùng đẩy dữ liệu này đến các cửa hàng tập trung.

Trong ví dụ được cung cấp trong chương này, chúng tôi sẽ tạo một ứng dụng và lấy các tweet từ nó bằng cách sử dụng nguồn twitter thử nghiệm do Apache Flume cung cấp. Chúng tôi sẽ sử dụng kênh bộ nhớ để đệm các tweet này và HDFS chìm để đẩy các tweet này vào HDFS.

Để tìm nạp dữ liệu Twitter, chúng tôi sẽ phải làm theo các bước dưới đây:

  • Tạo một ứng dụng twitter
  • Cài đặt / Khởi động HDFS
  • Định cấu hình Flume

Tạo ứng dụng Twitter

Để nhận được các tweet từ Twitter, cần phải tạo một ứng dụng Twitter. Làm theo các bước dưới đây để tạo một ứng dụng Twitter.

Bước 1

Để tạo một ứng dụng Twitter, hãy nhấp vào liên kết sau https://apps.twitter.com/. Đăng nhập vào tài khoản Twitter của bạn. Bạn sẽ có cửa sổ Quản lý ứng dụng Twitter nơi bạn có thể tạo, xóa và quản lý Ứng dụng Twitter.

Bước 2

Bấm vào Create New Appcái nút. Bạn sẽ được chuyển hướng đến một cửa sổ nơi bạn sẽ nhận được một mẫu đơn đăng ký trong đó bạn phải điền thông tin chi tiết của mình để tạo Ứng dụng. Trong khi điền địa chỉ trang web, hãy cung cấp mẫu URL hoàn chỉnh, ví dụ:http://example.com.

Bước 3

Điền vào các chi tiết, chấp nhận Developer Agreement khi hoàn thành, hãy nhấp vào Create your Twitter application buttonở cuối trang. Nếu mọi thứ suôn sẻ, một Ứng dụng sẽ được tạo với các chi tiết cụ thể như hình dưới đây.

Bước 4

Dưới keys and Access Tokens ở cuối trang, bạn có thể thấy một nút có tên Create my access token. Nhấp vào nó để tạo mã thông báo truy cập.

Bước 5

Cuối cùng, nhấp vào Test OAuthnút ở phía trên bên phải của trang. Điều này sẽ dẫn đến một trang hiển thịConsumer key, Consumer secret, Access token,Access token secret. Sao chép các chi tiết này. Chúng hữu ích để cấu hình tác nhân trong Flume.

Khởi động HDFS

Vì chúng tôi đang lưu trữ dữ liệu trong HDFS, chúng tôi cần cài đặt / xác minh Hadoop. Khởi động Hadoop và tạo một thư mục trong đó để lưu trữ dữ liệu Flume. Làm theo các bước dưới đây trước khi định cấu hình Flume.

Bước 1: Cài đặt / Xác minh Hadoop

Cài đặt Hadoop . Nếu Hadoop đã được cài đặt trong hệ thống của bạn, hãy xác minh cài đặt bằng lệnh phiên bản Hadoop, như được hiển thị bên dưới.

$ hadoop version

Nếu hệ thống của bạn chứa Hadoop và nếu bạn đã đặt biến đường dẫn, thì bạn sẽ nhận được kết quả sau:

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

Bước 2: Khởi động Hadoop

Duyệt qua sbin thư mục của Hadoop và bắt đầu sợi và Hadoop dfs (hệ thống tệp phân tán) như hình dưới đây.

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
  
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

Bước 3: Tạo thư mục trong HDFS

Trong Hadoop DFS, bạn có thể tạo các thư mục bằng lệnh mkdir. Duyệt qua nó và tạo một thư mục với têntwitter_data trong đường dẫn bắt buộc như hình dưới đây.

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

Cấu hình Flume

Chúng tôi phải định cấu hình nguồn, kênh và bồn rửa bằng cách sử dụng tệp cấu hình trong confthư mục. Ví dụ được đưa ra trong chương này sử dụng nguồn thử nghiệm do Apache Flume cung cấp có tênTwitter 1% Firehose Kênh bộ nhớ và HDFS chìm.

Nguồn Twitter 1% Firehose

Nguồn này mang tính thử nghiệm cao. Nó kết nối với Twitter Firehose mẫu 1% bằng cách sử dụng API phát trực tuyến và liên tục tải xuống các tweet, chuyển đổi chúng sang định dạng Avro và gửi các sự kiện Avro đến một bồn chứa Flume hạ lưu.

Chúng tôi sẽ lấy nguồn này theo mặc định cùng với việc cài đặt Flume. Cácjar các tệp tương ứng với nguồn này có thể được đặt trong lib thư mục như hình dưới đây.

Đặt đường dẫn classpath

Đặt classpath biến thành lib thư mục Flume trong Flume-env.sh tập tin như hình dưới đây.

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

Nguồn này cần các chi tiết như Consumer key, Consumer secret, Access token,Access token secretcủa một ứng dụng Twitter. Trong khi định cấu hình nguồn này, bạn phải cung cấp giá trị cho các thuộc tính sau:

  • Channels

  • Source type : org.apache.flume.source.twitter.TwitterSource

  • consumerKey - Khóa người dùng OAuth

  • consumerSecret - Bí mật của người tiêu dùng OAuth

  • accessToken - Mã thông báo truy cập OAuth

  • accessTokenSecret - Bí mật mã thông báo OAuth

  • maxBatchSize- Số lượng tin nhắn twitter tối đa phải có trong một đợt twitter. Giá trị mặc định là 1000 (tùy chọn).

  • maxBatchDurationMillis- Số mili giây tối đa để chờ trước khi đóng một lô. Giá trị mặc định là 1000 (tùy chọn).

Kênh

Chúng tôi đang sử dụng kênh bộ nhớ. Để cấu hình kênh bộ nhớ, bạn phải cung cấp giá trị cho loại kênh.

  • type- Nó chứa loại kênh. Trong ví dụ của chúng tôi, loại làMemChannel.

  • Capacity- Đây là số lượng sự kiện tối đa được lưu trữ trong kênh. Giá trị mặc định của nó là 100 (tùy chọn).

  • TransactionCapacity- Là số lượng sự kiện tối đa mà kênh chấp nhận hoặc gửi. Giá trị mặc định của nó là 100 (tùy chọn).

HDFS Sink

Phần chìm này ghi dữ liệu vào HDFS. Để cấu hình bồn rửa này, bạn phải cung cấp các chi tiết sau.

  • Channel

  • type - hdfs

  • hdfs.path - đường dẫn của thư mục trong HDFS nơi dữ liệu sẽ được lưu trữ.

Và chúng tôi có thể cung cấp một số giá trị tùy chọn dựa trên kịch bản. Dưới đây là các thuộc tính tùy chọn của ổ HDFS mà chúng tôi đang định cấu hình trong ứng dụng của mình.

  • fileType - Đây là định dạng tệp bắt buộc của tệp HDFS của chúng tôi. SequenceFile, DataStreamCompressedStreamlà ba loại có sẵn với luồng này. Trong ví dụ của chúng tôi, chúng tôi đang sử dụngDataStream.

  • writeFormat - Có thể là văn bản hoặc có thể ghi.

  • batchSize- Đó là số sự kiện được ghi vào một tệp trước khi nó được chuyển vào HDFS. Giá trị mặc định của nó là 100.

  • rollsize- Đó là kích thước tệp để kích hoạt cuộn. Giá trị mặc định là 100.

  • rollCount- Là số sự kiện được ghi vào tệp trước khi cuộn. Giá trị mặc định của nó là 10.

Ví dụ - Tệp cấu hình

Dưới đây là một ví dụ về tệp cấu hình. Sao chép nội dung này và lưu dưới dạngtwitter.conf trong thư mục conf của Flume.

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
  
# Describing/Configuring the sink 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
 
# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

Chấp hành

Duyệt qua thư mục chính của Flume và thực thi ứng dụng như hình dưới đây.

$ cd $FLUME_HOME 
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

Nếu mọi thứ diễn ra tốt đẹp, việc phát trực tuyến các tweet vào HDFS sẽ bắt đầu. Dưới đây là ảnh chụp nhanh của cửa sổ nhắc lệnh trong khi tìm nạp các tweet.

Xác minh HDFS

Bạn có thể truy cập giao diện người dùng Web quản trị Hadoop bằng URL được cung cấp bên dưới.

http://localhost:50070/

Nhấp vào menu thả xuống có tên Utilitiesở phía bên phải của trang. Bạn có thể thấy hai tùy chọn như được hiển thị trong ảnh chụp nhanh bên dưới.

Bấm vào Browse the file systemvà nhập đường dẫn của thư mục HDFS nơi bạn đã lưu trữ các tweet. Trong ví dụ của chúng tôi, đường dẫn sẽ là/user/Hadoop/twitter_data/. Sau đó, bạn có thể xem danh sách các tệp nhật ký twitter được lưu trữ trong HDFS như bên dưới.