Apache Kafka - Spark ile Entegrasyon
Bu bölümde Apache Kafka'nın Spark Streaming API ile nasıl entegre edileceğini tartışacağız.
Spark hakkında
Spark Streaming API, canlı veri akışlarının ölçeklenebilir, yüksek verimli, hataya dayanıklı akış işlemesini sağlar. Veriler, Kafka, Flume, Twitter vb. Birçok kaynaktan alınabilir ve harita, azaltma, birleştirme ve pencere gibi üst düzey işlevler gibi karmaşık algoritmalar kullanılarak işlenebilir. Son olarak, işlenen veriler dosya sistemlerine, veri tabanlarına ve canlı gösterge panolarına gönderilebilir. Esnek Dağıtılmış Veri Kümeleri (RDD), Spark'ın temel bir veri yapısıdır. Değişmez dağıtılmış nesneler koleksiyonudur. RDD'deki her veri kümesi, kümenin farklı düğümlerinde hesaplanabilen mantıksal bölümlere bölünmüştür.
Spark ile entegrasyon
Kafka, Spark akışı için potansiyel bir mesajlaşma ve entegrasyon platformudur. Kafka, gerçek zamanlı veri akışları için merkezi bir merkez görevi görür ve Spark Streaming'de karmaşık algoritmalar kullanılarak işlenir. Veriler işlendikten sonra, Spark Streaming sonuçları başka bir Kafka konusunda yayınlıyor olabilir veya HDFS'de, veritabanlarında veya kontrol panellerinde depolayabilir. Aşağıdaki şema kavramsal akışı göstermektedir.
Şimdi Kafka-Spark API'lerini detaylı olarak inceleyelim.
SparkConf API
Bir Spark uygulaması için yapılandırmayı temsil eder. Çeşitli Spark parametrelerini anahtar-değer çiftleri olarak ayarlamak için kullanılır.
SparkConf
sınıfı aşağıdaki yöntemlere sahiptir -
set(string key, string value) - yapılandırma değişkenini ayarlayın.
remove(string key) - anahtarı yapılandırmadan çıkarın.
setAppName(string name) - uygulamanız için uygulama adını ayarlayın.
get(string key) - anahtarı al
StreamingContext API
Bu, Spark işlevselliği için ana giriş noktasıdır. SparkContext, bir Spark kümesine olan bağlantıyı temsil eder ve kümede RDD'ler, toplayıcılar ve yayın değişkenleri oluşturmak için kullanılabilir. İmza aşağıda gösterildiği gibi tanımlanır.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - bağlanılacak küme URL'si (ör. Mesos: // ana bilgisayar: bağlantı noktası, kıvılcım: // ana bilgisayar: bağlantı noktası, yerel [4]).
appName - küme web kullanıcı arayüzünde görüntülenecek işinizin adı
batchDuration - akış verilerinin gruplara bölüneceği zaman aralığı
public StreamingContext(SparkConf conf, Duration batchDuration)
Yeni bir SparkContext için gerekli yapılandırmayı sağlayarak bir StreamingContext oluşturun.
conf - Kıvılcım parametreleri
batchDuration - akış verilerinin gruplara bölüneceği zaman aralığı
KafkaUtils API
KafkaUtils API, Kafka kümesini Spark akışına bağlamak için kullanılır. Bu API, aşağıdaki gibi tanımlanan createStream
imzasının önemli yöntemine sahiptir
.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
Yukarıda gösterilen yöntem, Kafka Brokers'tan mesajları çeken bir giriş akışı oluşturmak için kullanılır.
ssc - StreamingContext nesnesi.
zkQuorum - Hayvan bakıcısı yeter sayısı.
groupId - Bu tüketicinin grup kimliği.
topics - tüketilecek konuların bir haritasını döndürür.
storageLevel - Alınan nesneleri saklamak için kullanılacak depolama seviyesi.
KafkaUtils API, herhangi bir alıcı kullanmadan Kafka Brokers'tan doğrudan mesaj çeken bir giriş akışı oluşturmak için kullanılan başka bir createDirectStream yöntemine sahiptir. Bu akış, Kafka'dan gelen her mesajın dönüşümlere tam olarak bir kez dahil edilmesini garanti edebilir.
Örnek uygulama Scala'da yapılmıştır. Uygulamayı derlemek için lütfen sbt
, scala build tool'u
( maven'e
benzer) indirip kurun . Ana uygulama kodu aşağıda sunulmuştur.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Komut Dosyası Oluştur
Kıvılcım-kafka entegrasyonu kıvılcım, kıvılcım akışı ve kıvılcım Kafka entegrasyon kavanozuna bağlıdır. Yeni bir build.sbt
dosyası oluşturun
ve uygulama ayrıntılarını ve bağımlılığını belirtin. Sbt
derleme ve uygulama paketleme sırasında gerekli kavanoz indirecektir.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Derleme / Paketleme
Uygulamanın jar dosyasını derlemek ve paketlemek için aşağıdaki komutu çalıştırın. Uygulamayı çalıştırmak için jar dosyasını spark konsoluna göndermemiz gerekiyor.
sbt package
Spark'a gönderiliyor
Kafka Producer CLI'yi başlatın (önceki bölümde açıklanmıştır), my-first-topic
adlı yeni bir konu oluşturun ve aşağıda gösterildiği gibi bazı örnek mesajlar sağlayın.
Another spark test message
Uygulamayı spark konsoluna göndermek için aşağıdaki komutu çalıştırın.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
Bu uygulamanın örnek çıktısı aşağıda gösterilmiştir.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..