Apache Kafka - Integrasi Dengan Spark

Pada bab ini, kita akan membahas tentang bagaimana mengintegrasikan Apache Kafka dengan API Streaming Spark.

Tentang Spark

Spark Streaming API memungkinkan pemrosesan aliran data langsung yang dapat diskalakan, throughput tinggi, dan toleran terhadap kesalahan. Data dapat diserap dari banyak sumber seperti Kafka, Flume, Twitter, dll., Dan dapat diproses menggunakan algoritme kompleks seperti fungsi tingkat tinggi seperti peta, reduksi, gabung, dan jendela. Terakhir, data yang diproses dapat dikirim ke sistem file, database, dan papan dasbor langsung. Set Data Terdistribusi Tangguh (RDD) adalah struktur data fundamental dari Spark. Ini adalah kumpulan objek terdistribusi yang tidak dapat diubah. Setiap set data di RDD dibagi menjadi beberapa partisi logis, yang dapat dihitung pada node cluster yang berbeda.

Integrasi dengan Spark

Kafka adalah platform perpesanan dan integrasi potensial untuk streaming Spark. Kafka bertindak sebagai hub pusat untuk aliran data real-time dan diproses menggunakan algoritme kompleks di Spark Streaming. Setelah data diproses, Spark Streaming dapat mempublikasikan hasil ke dalam topik Kafka lain atau menyimpan di HDFS, database atau dasbor. Diagram berikut menggambarkan aliran konseptual.

Sekarang, mari kita lihat API Kafka-Spark secara detail.

SparkConf API

Ini mewakili konfigurasi untuk aplikasi Spark. Digunakan untuk menyetel berbagai parameter Spark sebagai pasangan nilai-kunci.

Kelas SparkConf memiliki metode berikut -

  • set(string key, string value) - mengatur variabel konfigurasi.

  • remove(string key) - hapus kunci dari konfigurasi.

  • setAppName(string name) - atur nama aplikasi untuk aplikasi Anda.

  • get(string key) - ambil kunci

API StreamingContext

Ini adalah titik masuk utama untuk fungsionalitas Spark. SparkContext merepresentasikan koneksi ke cluster Spark, dan dapat digunakan untuk membuat RDD, akumulator, dan variabel siaran di cluster. Tanda tangan didefinisikan seperti yang ditunjukkan di bawah ini.

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - URL cluster yang akan disambungkan (misalnya mesos: // host: port, spark: // host: port, local [4]).

  • appName - nama untuk pekerjaan Anda, untuk ditampilkan di UI web cluster

  • batchDuration - interval waktu di mana data streaming akan dibagi menjadi beberapa batch

public StreamingContext(SparkConf conf, Duration batchDuration)

Buat StreamingContext dengan menyediakan konfigurasi yang diperlukan untuk SparkContext baru.

  • conf - Parameter percikan

  • batchDuration - interval waktu di mana data streaming akan dibagi menjadi beberapa batch

KafkaUtils API

KafkaUtils API digunakan untuk menghubungkan cluster Kafka ke streaming Spark. API ini memiliki tanda tangan createStream metode signifikan yang didefinisikan seperti di bawah ini.

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Metode yang ditunjukkan di atas digunakan untuk membuat aliran input yang menarik pesan dari Broker Kafka.

  • ssc - Objek StreamingContext.

  • zkQuorum - Kuorum penjaga kebun binatang.

  • groupId - ID grup untuk konsumen ini.

  • topics - mengembalikan peta topik untuk dikonsumsi.

  • storageLevel - Tingkat penyimpanan yang digunakan untuk menyimpan objek yang diterima.

KafkaUtils API memiliki metode lain createDirectStream, yang digunakan untuk membuat aliran input yang secara langsung menarik pesan dari Kafka Brokers tanpa menggunakan penerima apa pun. Aliran ini dapat menjamin bahwa setiap pesan dari Kafka disertakan dalam transformasi tepat satu kali.

Aplikasi sampel dilakukan di Scala. Untuk mengkompilasi aplikasi, silahkan download dan install sbt , scala build tool (mirip dengan maven). Kode aplikasi utama disajikan di bawah ini.

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Build Script

Integrasi spark-kafka bergantung pada percikan, streaming percikan, dan percikan Kafka integrasi jar. Buat file baru build.sbt dan tentukan detail aplikasi dan ketergantungannya. The sbt akan men-download jar diperlukan saat kompilasi dan kemasan aplikasi.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Kompilasi / Pengemasan

Jalankan perintah berikut untuk mengompilasi dan mengemas file jar aplikasi. Kita perlu mengirimkan file jar ke konsol percikan untuk menjalankan aplikasi.

sbt package

Mengirimkan ke Spark

Mulai CLI Produser Kafka (dijelaskan di bab sebelumnya), buat topik baru bernama my-first-topic dan berikan beberapa contoh pesan seperti yang ditunjukkan di bawah ini.

Another spark test message

Jalankan perintah berikut untuk mengirimkan aplikasi ke konsol percikan.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

Output sampel dari aplikasi ini ditampilkan di bawah ini.

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..