Apache Kafka - Tích hợp với Spark
Trong chương này, chúng ta sẽ thảo luận về cách tích hợp Apache Kafka với Spark Streaming API.
Về Spark
Spark Streaming API cho phép xử lý luồng dữ liệu trực tiếp có thể mở rộng, thông lượng cao, chịu được lỗi. Dữ liệu có thể được nhập từ nhiều nguồn như Kafka, Flume, Twitter, v.v. và có thể được xử lý bằng các thuật toán phức tạp như các chức năng cấp cao như bản đồ, thu nhỏ, nối và cửa sổ. Cuối cùng, dữ liệu đã xử lý có thể được đẩy ra hệ thống tệp, cơ sở dữ liệu và bảng điều khiển trực tiếp. Tập dữ liệu phân tán có khả năng phục hồi (RDD) là một cấu trúc dữ liệu cơ bản của Spark. Nó là một tập hợp các đối tượng được phân phối bất biến. Mỗi tập dữ liệu trong RDD được chia thành các phân vùng logic, có thể được tính toán trên các nút khác nhau của cụm.
Tích hợp với Spark
Kafka là một nền tảng tích hợp và nhắn tin tiềm năng để phát trực tuyến Spark. Kafka đóng vai trò là trung tâm trung tâm cho các luồng dữ liệu thời gian thực và được xử lý bằng các thuật toán phức tạp trong Spark Streaming. Sau khi dữ liệu được xử lý, Spark Streaming có thể xuất bản kết quả vào một chủ đề Kafka khác hoặc lưu trữ trong HDFS, cơ sở dữ liệu hoặc trang tổng quan. Sơ đồ sau đây mô tả luồng khái niệm.
Bây giờ, hãy cùng chúng tôi tìm hiểu chi tiết về API Kafka-Spark.
API SparkConf
Nó đại diện cho cấu hình cho một ứng dụng Spark. Được sử dụng để đặt các tham số Spark khác nhau dưới dạng các cặp khóa-giá trị.
Lớp SparkConf
có các phương thức sau:
set(string key, string value) - thiết lập biến cấu hình.
remove(string key) - xóa khóa khỏi cấu hình.
setAppName(string name) - đặt tên ứng dụng cho ứng dụng của bạn.
get(string key) - lấy chìa khóa
API StreamingContext
Đây là điểm nhập chính cho chức năng Spark. SparkContext đại diện cho kết nối với một cụm Spark và có thể được sử dụng để tạo RDD, bộ tích lũy và các biến quảng bá trên cụm. Chữ ký được xác định như hình bên dưới.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - URL cụm để kết nối (ví dụ: mesos: // host: port, spark: // host: port, local [4]).
appName - tên cho công việc của bạn, để hiển thị trên giao diện người dùng web cụm
batchDuration - khoảng thời gian mà dữ liệu truyền trực tuyến sẽ được chia thành các lô
public StreamingContext(SparkConf conf, Duration batchDuration)
Tạo một StreamingContext bằng cách cung cấp cấu hình cần thiết cho một SparkContext mới.
conf - Thông số tia lửa
batchDuration - khoảng thời gian mà dữ liệu truyền trực tuyến sẽ được chia thành các lô
API KafkaUtils
API KafkaUtils được sử dụng để kết nối cụm Kafka với luồng Spark. API này có chữ ký createStream của
phương thức signifi
-cant được định nghĩa như bên dưới.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
Phương thức được hiển thị ở trên được sử dụng để Tạo một luồng đầu vào kéo các thông điệp từ Kafka Brokers.
ssc - Đối tượng StreamingContext.
zkQuorum - Số đại biểu của Zookeeper.
groupId - Id nhóm cho người tiêu dùng này.
topics - trả lại một bản đồ các chủ đề để tiêu thụ.
storageLevel - Mức lưu trữ để sử dụng cho việc lưu trữ các đối tượng đã nhận.
KafkaUtils API có một phương thức khác là createDirectStream, được sử dụng để tạo luồng đầu vào trực tiếp kéo các thông báo từ Kafka Broker mà không cần sử dụng bất kỳ bộ thu nào. Luồng này có thể đảm bảo rằng mỗi tin nhắn từ Kafka được đưa vào các phép biến đổi chính xác một lần.
Ứng dụng mẫu được thực hiện trong Scala. Để biên dịch ứng dụng, vui lòng tải xuống và cài đặt công cụ xây dựng sbt
, scala (tương tự như maven). Mã ứng dụng chính được trình bày bên dưới.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Xây dựng tập lệnh
Việc tích hợp spark-kafka phụ thuộc vào tia lửa, dòng tia lửa và bình tích hợp tia lửa Kafka. Tạo một tệp build.sbt mới
và chỉ định chi tiết ứng dụng và sự phụ thuộc của nó. Các SBT
sẽ tải về jar cần thiết khi biên dịch và đóng gói ứng dụng.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Biên dịch / Đóng gói
Chạy lệnh sau để biên dịch và đóng gói tệp jar của ứng dụng. Chúng tôi cần gửi tệp jar vào bảng điều khiển tia lửa để chạy ứng dụng.
sbt package
Phục tùng Spark
Khởi động Kafka Producer CLI (đã giải thích trong chương trước), tạo một chủ đề mới được gọi là chủ đề đầu tiên của tôi
và cung cấp một số thông báo mẫu như hình dưới đây.
Another spark test message
Chạy lệnh sau để gửi ứng dụng tới bảng điều khiển tia lửa.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
Kết quả mẫu của ứng dụng này được hiển thị bên dưới.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..