Apache Kafka - การผสานรวมกับ Spark
ในบทนี้เราจะพูดถึงวิธีการรวม Apache Kafka กับ Spark Streaming API
เกี่ยวกับ Spark
Spark Streaming API ช่วยให้สามารถประมวลผลสตรีมข้อมูลสดที่ปรับขนาดได้ปริมาณงานสูงและทนต่อข้อผิดพลาด ข้อมูลสามารถนำเข้าได้จากหลายแหล่งเช่น Kafka, Flume, Twitter เป็นต้นและสามารถประมวลผลโดยใช้อัลกอริทึมที่ซับซ้อนเช่นฟังก์ชันระดับสูงเช่นแผนที่การย่อขนาดการเข้าร่วมและหน้าต่าง ในที่สุดข้อมูลที่ประมวลผลแล้วสามารถส่งออกไปยังระบบไฟล์ฐานข้อมูลและแดชบอร์ดแบบสดได้ Resilient Distributed Datasets (RDD) เป็นโครงสร้างข้อมูลพื้นฐานของ Spark มันเป็นคอลเลกชันของวัตถุที่กระจายไม่เปลี่ยนรูป ชุดข้อมูลแต่ละชุดใน RDD จะแบ่งออกเป็นโลจิคัลพาร์ติชันซึ่งอาจคำนวณจากโหนดต่าง ๆ ของคลัสเตอร์
บูรณาการกับ Spark
Kafka เป็นแพลตฟอร์มการส่งข้อความและการผสานรวมที่มีศักยภาพสำหรับสตรีมมิ่ง Spark Kafka ทำหน้าที่เป็นศูนย์กลางสำหรับสตรีมข้อมูลแบบเรียลไทม์และประมวลผลโดยใช้อัลกอริทึมที่ซับซ้อนใน Spark Streaming เมื่อประมวลผลข้อมูลแล้ว Spark Streaming อาจเผยแพร่ผลลัพธ์ในหัวข้อ Kafka อื่นหรือจัดเก็บใน HDFS ฐานข้อมูลหรือแดชบอร์ด แผนภาพต่อไปนี้แสดงให้เห็นถึงการไหลของแนวคิด
ตอนนี้ให้เราดูรายละเอียดเกี่ยวกับ Kafka-Spark API
SparkConf API
แสดงถึงการกำหนดค่าสำหรับแอปพลิเคชัน Spark ใช้เพื่อตั้งค่าพารามิเตอร์ Spark ต่างๆเป็นคู่คีย์ - ค่า
คลาสSparkConf
มีวิธีการดังต่อไปนี้ -
set(string key, string value) - ตั้งค่าตัวแปรการกำหนดค่า
remove(string key) - ลบคีย์ออกจากการกำหนดค่า
setAppName(string name) - ตั้งชื่อแอปพลิเคชันสำหรับแอปพลิเคชันของคุณ
get(string key) - รับกุญแจ
StreamingContext API
นี่คือจุดเริ่มต้นหลักสำหรับฟังก์ชัน Spark SparkContext แสดงถึงการเชื่อมต่อกับคลัสเตอร์ Spark และสามารถใช้เพื่อสร้าง RDDs, ตัวสะสมและตัวแปรออกอากาศบนคลัสเตอร์ ลายเซ็นถูกกำหนดตามที่แสดงด้านล่าง
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - คลัสเตอร์ URL ที่จะเชื่อมต่อ (เช่น mesos: // host: port, spark: // host: port, local [4])
appName - ชื่องานของคุณเพื่อแสดงบน UI เว็บคลัสเตอร์
batchDuration - ช่วงเวลาที่ข้อมูลการสตรีมจะถูกแบ่งออกเป็นแบทช์
public StreamingContext(SparkConf conf, Duration batchDuration)
สร้าง StreamingContext โดยจัดเตรียมคอนฟิกูเรชันที่จำเป็นสำหรับ SparkContext ใหม่
conf - พารามิเตอร์ Spark
batchDuration - ช่วงเวลาที่ข้อมูลการสตรีมจะถูกแบ่งออกเป็นแบทช์
KafkaUtils API
KafkaUtils API ใช้เพื่อเชื่อมต่อคลัสเตอร์ Kafka กับ Spark streaming API นี้มีวิธีการsignifi
-cant ลายเซ็นcreateStream ที่
กำหนดไว้ด้านล่าง
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
วิธีการที่แสดงด้านบนใช้เพื่อสร้างสตรีมอินพุตที่ดึงข้อความจาก Kafka Brokers
ssc - วัตถุ StreamingContext
zkQuorum - องค์ประชุมผู้ดูแลสวนสัตว์
groupId - รหัสกลุ่มสำหรับผู้บริโภครายนี้
topics - ส่งคืนแผนที่หัวข้อที่ต้องการบริโภค
storageLevel - ระดับการจัดเก็บที่จะใช้ในการจัดเก็บวัตถุที่ได้รับ
KafkaUtils API มีวิธีการ createDirectStream อีกวิธีหนึ่งซึ่งใช้ในการสร้างสตรีมอินพุตที่ดึงข้อความจาก Kafka Brokers โดยตรงโดยไม่ต้องใช้เครื่องรับใด ๆ สตรีมนี้สามารถรับประกันได้ว่าแต่ละข้อความจาก Kafka จะรวมอยู่ในการเปลี่ยนแปลงเพียงครั้งเดียว
แอปพลิเคชันตัวอย่างทำได้ใน Scala ในการรวบรวมแอปพลิเคชันโปรดดาวน์โหลดและติดตั้งsbt
, scala build tool (คล้ายกับ maven) รหัสแอปพลิเคชันหลักแสดงอยู่ด้านล่าง
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
สร้างสคริปต์
การผสานรวม spark-kafka ขึ้นอยู่กับการจุดประกายการสตรีมและจุดประกายโถการรวม Kafka สร้างไฟล์ใหม่build.sbt
และระบุรายละเอียดแอปพลิเคชันและการอ้างอิง SBT
จะดาวน์โหลดขวดที่จำเป็นในขณะที่รวบรวมและการบรรจุแอพลิเคชัน
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
การรวบรวม / บรรจุภัณฑ์
รันคำสั่งต่อไปนี้เพื่อคอมไพล์และแพ็กเกจไฟล์ jar ของแอ็พพลิเคชัน เราจำเป็นต้องส่งไฟล์ jar ไปยังคอนโซล spark เพื่อเรียกใช้แอปพลิเคชัน
sbt package
ส่งไปยัง Spark
เริ่ม Kafka Producer CLI (อธิบายในบทก่อนหน้า) สร้างหัวข้อใหม่ชื่อmy-first-topic
และให้ข้อความตัวอย่างตามที่แสดงด้านล่าง
Another spark test message
เรียกใช้คำสั่งต่อไปนี้เพื่อส่งแอปพลิเคชันไปยัง spark Console
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
ผลลัพธ์ตัวอย่างของแอปพลิเคชันนี้แสดงไว้ด้านล่าง
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..