अपाचे काफ्का - स्पार्क के साथ एकीकरण
इस अध्याय में, हम स्पार्क स्ट्रीमिंग एपीआई के साथ अपाचे काफ्का को एकीकृत करने के तरीके के बारे में चर्चा करेंगे।
स्पार्क के बारे में
स्पार्क स्ट्रीमिंग एपीआई स्केलेबल, उच्च-थ्रूपुट, लाइव डेटा धाराओं के दोष-सहिष्णु स्ट्रीम प्रसंस्करण को सक्षम करता है। डेटा को काफ्का, फ्लूम, ट्विटर, आदि जैसे कई स्रोतों से प्राप्त किया जा सकता है, और जटिल एल्गोरिदम जैसे कि उच्च-स्तरीय कार्यों जैसे मानचित्र, कम करना, जुड़ना और खिड़की का उपयोग करके संसाधित किया जा सकता है। अंत में, संसाधित डेटा को फाइल सिस्टम, डेटाबेस और लाइव डैश-बोर्ड पर धकेल दिया जा सकता है। रेसिलिएंट डिस्ट्रिब्यूटेड डेटसेट्स (आरडीडी) स्पार्क की एक मूलभूत डेटा संरचना है। यह वस्तुओं का एक अपरिवर्तित वितरित संग्रह है। RDD में प्रत्येक डेटासेट को तार्किक विभाजन में विभाजित किया गया है, जिसे क्लस्टर के विभिन्न नोड्स पर गणना की जा सकती है।
स्पार्क के साथ एकीकरण
काफ्का स्पार्क स्ट्रीमिंग के लिए एक संभावित संदेश और एकीकरण मंच है। काफ्का डेटा के वास्तविक समय की धाराओं के लिए केंद्रीय केंद्र के रूप में कार्य करता है और स्पार्क स्ट्रीमिंग में जटिल एल्गोरिदम का उपयोग करके संसाधित किया जाता है। एक बार डेटा संसाधित हो जाने के बाद, स्पार्क स्ट्रीमिंग एचडीएफएस, डेटाबेस या डैशबोर्ड में अभी तक किसी अन्य काफ़्का विषय या स्टोर में परिणाम प्रकाशित कर सकती है। निम्नलिखित चित्र में वैचारिक प्रवाह को दर्शाया गया है।
अब, कफका-स्पार्क एपीआई के बारे में विस्तार से जानते हैं।
स्पार्ककोन एपीआई
यह स्पार्क एप्लिकेशन के लिए कॉन्फ़िगरेशन का प्रतिनिधित्व करता है। विभिन्न स्पार्क मापदंडों को कुंजी-मूल्य जोड़े के रूप में सेट करने के लिए उपयोग किया जाता है।
SparkConf
वर्ग के निम्नलिखित तरीके हैं -
set(string key, string value) - कॉन्फ़िगरेशन चर सेट करें।
remove(string key) - कॉन्फ़िगरेशन से कुंजी निकालें।
setAppName(string name) - अपने आवेदन के लिए आवेदन नाम सेट करें।
get(string key) - कुंजी प्राप्त करें
StreamingContext API
यह स्पार्क कार्यक्षमता के लिए मुख्य प्रवेश बिंदु है। SparkContext एक स्पार्क क्लस्टर से कनेक्शन का प्रतिनिधित्व करता है, और इसका उपयोग क्लस्टर पर RDDs, संचायक और प्रसारण चर बनाने के लिए किया जा सकता है। हस्ताक्षर को नीचे दिखाए अनुसार परिभाषित किया गया है।
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - कनेक्ट करने के लिए क्लस्टर URL (जैसे मेसोस: // होस्ट: पोर्ट, स्पार्क: // होस्ट: पोर्ट, लोकल [4])।
appName - क्लस्टर वेब UI पर प्रदर्शित करने के लिए आपकी नौकरी का एक नाम
batchDuration - समय अंतराल जिस पर स्ट्रीमिंग डेटा को बैचों में विभाजित किया जाएगा
public StreamingContext(SparkConf conf, Duration batchDuration)
नई स्पार्ककॉन्टेक्ट के लिए आवश्यक कॉन्फ़िगरेशन प्रदान करके एक स्ट्रीमिंग कॉन्टेक्स्ट बनाएं।
conf - स्पार्क पैरामीटर
batchDuration - समय अंतराल जिस पर स्ट्रीमिंग डेटा को बैचों में विभाजित किया जाएगा
काफ्काटिल्स एपीआई
KafkaUtils API का उपयोग काफ्का क्लस्टर को स्पार्क स्ट्रीमिंग से जोड़ने के लिए किया जाता है। इस एपीआई में नीचे की तरह परिभाषित साइन
-कैंट विधि createStream
हस्ताक्षर है।
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
ऊपर दिखाए गए तरीके का उपयोग एक इनपुट स्ट्रीम बनाने के लिए किया जाता है जो काफ्का ब्रोकर्स के संदेशों को खींचता है।
ssc - StreamingContext ऑब्जेक्ट।
zkQuorum - ज़ूकीपर कोरम।
groupId - इस उपभोक्ता के लिए ग्रुप आई.डी.
topics - उपभोग करने के लिए विषयों का एक नक्शा लौटाएं।
storageLevel - प्राप्त वस्तुओं के भंडारण के लिए उपयोग करने के लिए संग्रहण स्तर।
KafkaUtils API के पास एक और तरीका createDirectStream है, जिसका उपयोग एक इनपुट स्ट्रीम बनाने के लिए किया जाता है जो किसी भी रिसीवर का उपयोग किए बिना सीधे Kafka Brokers के संदेशों को खींचता है। यह धारा इस बात की गारंटी दे सकती है कि काफ़्का का प्रत्येक संदेश बिल्कुल एक बार परिवर्तनों में शामिल है।
नमूना आवेदन स्काला में किया जाता है। एप्लिकेशन को संकलित करने के लिए, कृपया sbt
, scala बिल्ड टूल ( मावेन के
समान) डाउनलोड और इंस्टॉल करें । मुख्य आवेदन कोड नीचे प्रस्तुत किया गया है।
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
स्क्रिप्ट बनाएँ
स्पार्क-काफ्का एकीकरण स्पार्क, स्पार्क स्ट्रीमिंग और स्पार्क कफका एकीकरण जार पर निर्भर करता है। एक नई फ़ाइल build.sbt
बनाएं और एप्लिकेशन विवरण और उसकी निर्भरता निर्दिष्ट करें। एसबीटी
जबकि संकलन और आवेदन पैकिंग आवश्यक जार डाउनलोड करेगा।
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
संकलन / पैकेजिंग
अनुप्रयोग के जार फ़ाइल को संकलित करने और पैकेज करने के लिए निम्नलिखित कमांड चलाएँ। हमें एप्लिकेशन को चलाने के लिए जार फ़ाइल को स्पार्क कंसोल में जमा करना होगा।
sbt package
स्पार्क के लिए प्रस्तुत करना
काफ्का निर्माता सीएलआई (पिछले अध्याय में समझाया गया) शुरू करें, मेरा-पहला-विषय
नामक एक नया विषय बनाएं और नीचे दिखाए गए अनुसार कुछ नमूना संदेश प्रदान करें।
Another spark test message
स्पार्क कंसोल को एप्लिकेशन सबमिट करने के लिए निम्न कमांड चलाएँ।
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
इस एप्लिकेशन का नमूना आउटपुट नीचे दिखाया गया है।
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..