Apache Kafka - Spark ile Entegrasyon

Bu bölümde Apache Kafka'nın Spark Streaming API ile nasıl entegre edileceğini tartışacağız.

Spark hakkında

Spark Streaming API, canlı veri akışlarının ölçeklenebilir, yüksek verimli, hataya dayanıklı akış işlemesini sağlar. Veriler, Kafka, Flume, Twitter vb. Birçok kaynaktan alınabilir ve harita, azaltma, birleştirme ve pencere gibi üst düzey işlevler gibi karmaşık algoritmalar kullanılarak işlenebilir. Son olarak, işlenen veriler dosya sistemlerine, veri tabanlarına ve canlı gösterge panolarına gönderilebilir. Esnek Dağıtılmış Veri Kümeleri (RDD), Spark'ın temel bir veri yapısıdır. Değişmez dağıtılmış nesneler koleksiyonudur. RDD'deki her veri kümesi, kümenin farklı düğümlerinde hesaplanabilen mantıksal bölümlere bölünmüştür.

Spark ile entegrasyon

Kafka, Spark akışı için potansiyel bir mesajlaşma ve entegrasyon platformudur. Kafka, gerçek zamanlı veri akışları için merkezi bir merkez görevi görür ve Spark Streaming'de karmaşık algoritmalar kullanılarak işlenir. Veriler işlendikten sonra, Spark Streaming sonuçları başka bir Kafka konusunda yayınlıyor olabilir veya HDFS'de, veritabanlarında veya kontrol panellerinde depolayabilir. Aşağıdaki şema kavramsal akışı göstermektedir.

Şimdi Kafka-Spark API'lerini detaylı olarak inceleyelim.

SparkConf API

Bir Spark uygulaması için yapılandırmayı temsil eder. Çeşitli Spark parametrelerini anahtar-değer çiftleri olarak ayarlamak için kullanılır.

SparkConf sınıfı aşağıdaki yöntemlere sahiptir -

  • set(string key, string value) - yapılandırma değişkenini ayarlayın.

  • remove(string key) - anahtarı yapılandırmadan çıkarın.

  • setAppName(string name) - uygulamanız için uygulama adını ayarlayın.

  • get(string key) - anahtarı al

StreamingContext API

Bu, Spark işlevselliği için ana giriş noktasıdır. SparkContext, bir Spark kümesine olan bağlantıyı temsil eder ve kümede RDD'ler, toplayıcılar ve yayın değişkenleri oluşturmak için kullanılabilir. İmza aşağıda gösterildiği gibi tanımlanır.

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - bağlanılacak küme URL'si (ör. Mesos: // ana bilgisayar: bağlantı noktası, kıvılcım: // ana bilgisayar: bağlantı noktası, yerel [4]).

  • appName - küme web kullanıcı arayüzünde görüntülenecek işinizin adı

  • batchDuration - akış verilerinin gruplara bölüneceği zaman aralığı

public StreamingContext(SparkConf conf, Duration batchDuration)

Yeni bir SparkContext için gerekli yapılandırmayı sağlayarak bir StreamingContext oluşturun.

  • conf - Kıvılcım parametreleri

  • batchDuration - akış verilerinin gruplara bölüneceği zaman aralığı

KafkaUtils API

KafkaUtils API, Kafka kümesini Spark akışına bağlamak için kullanılır. Bu API, aşağıdaki gibi tanımlanan createStream imzasının önemli yöntemine sahiptir .

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Yukarıda gösterilen yöntem, Kafka Brokers'tan mesajları çeken bir giriş akışı oluşturmak için kullanılır.

  • ssc - StreamingContext nesnesi.

  • zkQuorum - Hayvan bakıcısı yeter sayısı.

  • groupId - Bu tüketicinin grup kimliği.

  • topics - tüketilecek konuların bir haritasını döndürür.

  • storageLevel - Alınan nesneleri saklamak için kullanılacak depolama seviyesi.

KafkaUtils API, herhangi bir alıcı kullanmadan Kafka Brokers'tan doğrudan mesaj çeken bir giriş akışı oluşturmak için kullanılan başka bir createDirectStream yöntemine sahiptir. Bu akış, Kafka'dan gelen her mesajın dönüşümlere tam olarak bir kez dahil edilmesini garanti edebilir.

Örnek uygulama Scala'da yapılmıştır. Uygulamayı derlemek için lütfen sbt , scala build tool'u ( maven'e benzer) indirip kurun . Ana uygulama kodu aşağıda sunulmuştur.

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Komut Dosyası Oluştur

Kıvılcım-kafka entegrasyonu kıvılcım, kıvılcım akışı ve kıvılcım Kafka entegrasyon kavanozuna bağlıdır. Yeni bir build.sbt dosyası oluşturun ve uygulama ayrıntılarını ve bağımlılığını belirtin. Sbt derleme ve uygulama paketleme sırasında gerekli kavanoz indirecektir.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Derleme / Paketleme

Uygulamanın jar dosyasını derlemek ve paketlemek için aşağıdaki komutu çalıştırın. Uygulamayı çalıştırmak için jar dosyasını spark konsoluna göndermemiz gerekiyor.

sbt package

Spark'a gönderiliyor

Kafka Producer CLI'yi başlatın (önceki bölümde açıklanmıştır), my-first-topic adlı yeni bir konu oluşturun ve aşağıda gösterildiği gibi bazı örnek mesajlar sağlayın.

Another spark test message

Uygulamayı spark konsoluna göndermek için aşağıdaki komutu çalıştırın.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

Bu uygulamanın örnek çıktısı aşağıda gösterilmiştir.

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..