अपाचे काफ्का - त्वरित गाइड

बिग डेटा में, डेटा की एक विशाल मात्रा का उपयोग किया जाता है। डेटा के बारे में, हमारे पास दो मुख्य चुनौतियां हैं। पहली चुनौती यह है कि बड़ी मात्रा में डेटा कैसे एकत्र किया जाए और दूसरी चुनौती है एकत्रित डेटा का विश्लेषण करना। उन चुनौतियों से पार पाने के लिए, आपको एक संदेश प्रणाली की आवश्यकता होगी।

कफका वितरित उच्च थ्रूपुट प्रणालियों के लिए डिज़ाइन किया गया है। काफ्का अधिक पारंपरिक संदेश ब्रोकर के प्रतिस्थापन के रूप में बहुत अच्छी तरह से काम करता है। अन्य संदेश प्रणालियों की तुलना में, काफ्का में बेहतर थ्रूपुट, अंतर्निहित विभाजन, प्रतिकृति और निहित दोष-सहिष्णुता है, जो इसे बड़े पैमाने पर संदेश प्रसंस्करण अनुप्रयोगों के लिए एक अच्छा फिट बनाता है।

मैसेजिंग सिस्टम क्या है?

एक संदेश प्रणाली एक अनुप्रयोग से दूसरे में डेटा स्थानांतरित करने के लिए जिम्मेदार है, इसलिए एप्लिकेशन डेटा पर ध्यान केंद्रित कर सकते हैं, लेकिन इसे साझा करने के तरीके के बारे में चिंता न करें। वितरित संदेश विश्वसनीय संदेश कतार की अवधारणा पर आधारित है। संदेश क्लाइंट अनुप्रयोग और संदेश प्रणाली के बीच अतुल्यकालिक रूप से कतारबद्ध हैं। दो प्रकार के मैसेजिंग पैटर्न उपलब्ध हैं - एक है पॉइंट टू पॉइंट और दूसरा है पब्लिश-सब्सक्रिप्शन (पब-सब) मैसेजिंग सिस्टम। ज्यादातर मैसेजिंग पैटर्न फॉलो करते हैंpub-sub

प्वाइंट टू प्वाइंट मैसेजिंग सिस्टम

एक बिंदु से बिंदु प्रणाली में, संदेश एक कतार में बने रहते हैं। एक या अधिक उपभोक्ता कतार में मौजूद संदेशों का उपभोग कर सकते हैं, लेकिन एक विशेष संदेश का उपभोग अधिकतम एक उपभोक्ता ही कर सकता है। एक बार जब कोई उपभोक्ता कतार में कोई संदेश पढ़ता है, तो वह उस कतार से गायब हो जाता है। इस प्रणाली का विशिष्ट उदाहरण एक ऑर्डर प्रोसेसिंग सिस्टम है, जहां प्रत्येक ऑर्डर को एक ऑर्डर प्रोसेसर द्वारा संसाधित किया जाएगा, लेकिन एकाधिक ऑर्डर प्रोसेसर एक ही समय में भी काम कर सकते हैं। निम्नलिखित चित्र संरचना को दर्शाता है।

प्रकाशन-सदस्यता संदेश प्रणाली

प्रकाशन-सदस्यता प्रणाली में, संदेश एक विषय में बने रहते हैं। पॉइंट-टू-पॉइंट सिस्टम के विपरीत, उपभोक्ता एक या अधिक विषय की सदस्यता ले सकते हैं और उस विषय के सभी संदेशों का उपभोग कर सकते हैं। प्रकाशन-सदस्यता प्रणाली में, संदेश निर्माता को प्रकाशक कहा जाता है और संदेश उपभोक्ताओं को ग्राहक कहा जाता है। एक वास्तविक जीवन का उदाहरण डिश टीवी है, जो विभिन्न चैनलों जैसे खेल, सिनेमा, संगीत आदि को प्रकाशित करता है, और कोई भी अपने स्वयं के चैनल की सदस्यता ले सकता है और जब भी उनके सदस्यता प्राप्त चैनल उपलब्ध हैं, उन्हें प्राप्त कर सकता है।

काफ्का क्या है?

अपाचे काफ्का एक वितरित प्रकाशित-सदस्यता संदेश प्रणाली और एक मजबूत कतार है जो डेटा की एक उच्च मात्रा को संभाल सकती है और आपको एक छोर से दूसरे तक संदेश भेजने में सक्षम बनाती है। काफ्का ऑफ़लाइन और ऑनलाइन संदेश खपत दोनों के लिए उपयुक्त है। कफ़्का संदेशों को डिस्क पर जारी रखा जाता है और डेटा हानि को रोकने के लिए क्लस्टर के भीतर दोहराया जाता है। काफ्का ज़ूकीर सिंक्रोनाइज़ेशन सर्विस के शीर्ष पर बनाया गया है। यह वास्तविक समय के स्ट्रीमिंग डेटा विश्लेषण के लिए अपाचे स्टॉर्म और स्पार्क के साथ बहुत अच्छी तरह से एकीकृत करता है।

लाभ

काफ्का के कुछ लाभ निम्नलिखित हैं -

  • Reliability - काफ्का वितरित, विभाजन, प्रतिकृति और दोष सहिष्णुता है।

  • Scalability - काफ्का मैसेजिंग सिस्टम बिना समय के आसानी से तराजू ।।

  • Durability- काफ्का डिस्ट्रीब्यूटेड कम लॉग का उपयोग करता है, जिसका अर्थ है कि संदेश डिस्क पर जितनी जल्दी हो सके, इसलिए टिकाऊ है।

  • Performance- कफ़्का में संदेशों को प्रकाशित करने और सदस्यता देने दोनों के लिए उच्च प्रवाह है। यह स्थिर प्रदर्शन को बनाए रखता है यहां तक ​​कि कई टीबी संदेशों को संग्रहीत किया जाता है।

काफ्का बहुत तेज है और शून्य डाउनटाइम और शून्य डेटा हानि की गारंटी देता है।

बक्सों का इस्तेमाल करें

काफ्का का उपयोग कई उपयोग मामलों में किया जा सकता है। उनमें से कुछ नीचे सूचीबद्ध हैं -

  • Metrics- कफका अक्सर परिचालन निगरानी डेटा के लिए उपयोग किया जाता है। इसमें परिचालन डेटा के केंद्रीकृत फ़ीड का उत्पादन करने के लिए वितरित अनुप्रयोगों के एकत्रित आंकड़े शामिल हैं।

  • Log Aggregation Solution - कफ़्का का उपयोग कई सेवाओं से लॉग एकत्र करने और उन्हें मानक प्रारूप में कई कॉन-समर में उपलब्ध कराने के लिए एक संगठन में किया जा सकता है।

  • Stream Processing- स्टॉर्म और स्पार्क स्ट्रीमिंग जैसे लोकप्रिय ढांचे एक विषय के डेटा को पढ़ते हैं, इसे संसाधित करते हैं, और एक नए विषय पर संसाधित डेटा लिखते हैं जहां यह उपयोगकर्ताओं और अनुप्रयोगों के लिए उपलब्ध हो जाता है। धारा प्रसंस्करण के संदर्भ में काफ्का का मजबूत स्थायित्व भी बहुत उपयोगी है।

कफका की जरूरत है

कफका सभी वास्तविक समय के डेटा फीड को संभालने के लिए एक एकीकृत मंच है। काफ्का कम विलंबता संदेश वितरण का समर्थन करता है और मशीन विफलताओं की उपस्थिति में गलती सहिष्णुता की गारंटी देता है। इसमें बड़ी संख्या में विविध उपभोक्ताओं को संभालने की क्षमता है। काफ़्का बहुत तेज़ है, 2 मिलियन लिखता है / सेकंड। काफ्का डिस्क में सभी डेटा को बनाए रखता है, जिसका अनिवार्य रूप से मतलब है कि सभी राइट्स ओएस (रैम) के पेज कैश पर जाते हैं। यह पेज कैश से नेटवर्क सॉकेट में डेटा स्थानांतरित करने के लिए बहुत कुशल बनाता है।

काफ्का में गहराई से जाने से पहले, आपको मुख्य शब्दावली जैसे विषय, दलाल, निर्माता और उपभोक्ता के बारे में पता होना चाहिए। निम्नलिखित आरेख मुख्य शब्दावली को दिखाता है और तालिका आरेख घटकों का विस्तार से वर्णन करती है।

उपरोक्त आरेख में, एक विषय को तीन विभाजनों में कॉन्फ़िगर किया गया है। विभाजन 1 में दो ऑफसेट कारक 0 और 1. विभाजन 2 में चार ऑफसेट कारक 0, 1, 2 और 3 हैं। विभाजन 3 में एक ऑफसेट कारक 0. है। प्रतिकृति की आईडी उसी सर्वर की आईडी के समान है जो इसे होस्ट करता है।

मान लें, यदि विषय का प्रतिकृति कारक 3 पर सेट है, तो काफ्का प्रत्येक विभाजन के 3 समान प्रतिकृतियां बनाएगा और उन्हें अपने सभी कार्यों के लिए उपलब्ध कराने के लिए क्लस्टर में रखेगा। क्लस्टर में लोड को संतुलित करने के लिए, प्रत्येक ब्रोकर उन विभाजनों में से एक या अधिक को संग्रहीत करता है। एक ही समय में कई निर्माता और उपभोक्ता संदेशों को प्रकाशित और पुनः प्राप्त कर सकते हैं।

S.No अवयव और विवरण
1

Topics

किसी विशेष श्रेणी से संबंधित संदेशों की एक धारा को एक विषय कहा जाता है। डेटा विषयों में संग्रहीत किया जाता है।

विषय विभाजन में विभाजित हैं। प्रत्येक विषय के लिए, काफ्का एक विभाजन का एक मिनी-मम रखता है। इस तरह के प्रत्येक विभाजन में अपरिवर्तनीय क्रम में संदेश होते हैं। एक विभाजन को समान आकारों के खंड फ़ाइलों के सेट के रूप में कार्यान्वित किया जाता है।

2

Partition

विषय में कई विभाजन हो सकते हैं, इसलिए यह डेटा की एक मनमानी राशि को संभाल सकता है।

3

Partition offset

प्रत्येक विभाजित संदेश में एक अद्वितीय अनुक्रम आईडी है जिसे ऑफसेट कहा जाता है ।

4

Replicas of partition

प्रतिकृतियां विभाजन के बैकअप के अलावा और कुछ नहीं हैं । प्रतिकृतियां कभी भी डेटा को पढ़ने या लिखने की नहीं होती हैं। उनका उपयोग डेटा हानि को रोकने के लिए किया जाता है।

5

Brokers

  • दलाल पब-लाइन किए गए डेटा को बनाए रखने के लिए जिम्मेदार सरल प्रणाली हैं। प्रत्येक ब्रोकर के पास प्रति विषय शून्य या अधिक विभाजन हो सकते हैं। मान लें, यदि किसी विषय में N विभाजन और दलालों की संख्या N है, तो प्रत्येक दलाल का एक विभाजन होगा।

  • मान लें कि किसी विषय में N विभाजन हैं और N दलालों (n + m) से अधिक हैं, तो पहले N दलाल के पास एक विभाजन होगा और अगले M दलाल के पास उस विशेष विषय के लिए कोई विभाजन नहीं होगा।

  • मान लें कि किसी विषय में N विभाजन हैं और N दलालों (nm) से कम हैं, तो प्रत्येक दलाल के पास उनके साथ एक या अधिक विभाजन साझाकरण होगा। ब्रोकर के बीच असमान लोड डिस्ट्री-ब्यूटेन के कारण इस परिदृश्य की सिफारिश नहीं की जाती है।

6

Kafka Cluster

काफ्का के एक से अधिक ब्रोकर होने के कारण उसे काफ्का क्लस्टर कहा जाता है। एक काफ्का क्लस्टर को बिना डाउनटाइम के विस्तारित किया जा सकता है। ये क्लस्टर संदेश डेटा की दृढ़ता और प्रतिकृति का प्रबंधन करने के लिए उपयोग किए जाते हैं।

7

Producers

निर्माता एक या अधिक काफ्का विषयों के संदेशों के प्रकाशक हैं। निर्माता कफका दलालों को डेटा भेजते हैं। जब भी कोई प्रोड्यूसर किसी ब्रोकर को मैसेज देता है, ब्रोकर मैसेज को लास्ट सेगमेंट फाइल में डाल देता है। दरअसल, मैसेज को एक पार्टीशन में जोड़ा जाएगा। निर्माता अपनी पसंद के विभाजन के लिए संदेश भी भेज सकते हैं।

8

Consumers

उपभोक्ता दलालों से डेटा पढ़ते हैं। उपभोक्ता एक या अधिक विषयों की सदस्यता लेते हैं और दलालों से डेटा खींचकर प्रकाशित संदेशों का उपभोग करते हैं।

9

Leader

लीडर सभी विभाजन के लिए जिम्मेदार नोड है और दिए गए विभाजन के लिए लिखता है। हर विभाजन में एक सर्वर एक नेता के रूप में कार्य करता है।

10

Follower

नेता निर्देशों का पालन करने वाले नोड को अनुयायी कहा जाता है। यदि नेता विफल हो जाता है, तो अनुयायी में से एक स्वचालित रूप से नया नेता बन जाएगा। एक अनुयायी सामान्य उपभोक्ता के रूप में कार्य करता है, संदेशों को खींचता है और अपने स्वयं के डेटा स्टोर को अद्यतित करता है।

निम्नलिखित दृष्टांत पर एक नज़र डालें। यह काफ्का के क्लस्टर आरेख को दर्शाता है।

निम्न तालिका उपरोक्त आरेख में दिखाए गए प्रत्येक घटक का वर्णन करती है।

S.No अवयव और विवरण
1

Broker

लोड संतुलन बनाए रखने के लिए काफ्का क्लस्टर में आमतौर पर कई ब्रोकर होते हैं। काफ्का दलाल स्टेटलेस हैं, इसलिए वे अपने क्लस्टर राज्य को बनाए रखने के लिए ज़ूकेपर का उपयोग करते हैं। एक काफ्का ब्रोकर उदाहरण सैकड़ों-हजारों रीड्स को संभाल सकता है और प्रति सेकंड लिखता है और प्रत्येक ब्रो-केर प्रदर्शन प्रभाव के बिना संदेशों के टीबी को संभाल सकता है। काफ्का दलाल नेता का चुनाव ज़ूकीपर द्वारा किया जा सकता है।

2

ZooKeeper

ज़ूकेपर का उपयोग काफ्का दलाल के प्रबंधन और समन्वय के लिए किया जाता है। चिड़ियाघर कीपर सेवा का उपयोग मुख्य रूप से निर्माता और उपभोक्ता को काफ्का प्रणाली में किसी नए दलाल की उपस्थिति या काफ्का प्रणाली में दलाल की विफलता के बारे में सूचित करने के लिए किया जाता है। दलाल की उपस्थिति या विफलता के बारे में ज़ुकेर द्वारा प्राप्त अधिसूचना के अनुसार, प्रो-ड्यूरर और उपभोक्ता निर्णय लेते हैं और कुछ अन्य ब्रोकर के साथ अपने कार्य का समन्वय शुरू करते हैं।

3

Producers

निर्माता दलालों को डेटा धक्का देते हैं। जब नया ब्रोकर शुरू किया जाता है, तो सभी निर्माता इसे खोजते हैं और स्वचालित रूप से उस नए ब्रोकर को संदेश भेजते हैं। काफ्का निर्माता ब्रोकर से पावती के लिए इंतजार नहीं करता है और जितनी तेजी से ब्रोकर संभाल सकता है, उतने संदेश भेजता है।

4

Consumers

चूंकि काफ्का दलाल स्टेटलेस हैं, जिसका अर्थ है कि उपभोक्ता को यह सुनिश्चित करना होगा कि विभाजन ऑफसेट का उपयोग करके कितने संदेशों का उपभोग किया गया है। यदि उपभोक्ता एक विशेष संदेश को स्वीकार करता है, तो इसका अर्थ है कि उपभोक्ता ने सभी पूर्व संदेशों का उपभोग किया है। उपभोक्ता ब्रोकर को उपभोग करने के लिए तैयार बाइट्स का एक अतुल्यकालिक पुल अनुरोध जारी करता है। उपभोक्ता एक ऑफसेट मूल्य की आपूर्ति करके विभाजन में किसी भी बिंदु पर रिवाइंड या छोड़ सकते हैं। उपभोक्ता ऑफसेट मूल्य को चिड़ियाघर कीपर द्वारा अधिसूचित किया जाता है।

अब तक, हमने काफ्का की मुख्य अवधारणाओं पर चर्चा की। आइए अब काफ्का के वर्कफ़्लो पर कुछ प्रकाश डालें।

काफ्का बस एक या अधिक विभाजन में विभाजित विषयों का एक संग्रह है। एक काफ्का विभाजन संदेशों का एक क्रमबद्ध रूप से क्रमबद्ध क्रम है, जहां प्रत्येक संदेश को उनके सूचकांक द्वारा पहचाना जाता है (जिसे ऑफसेट कहा जाता है)। एक काफ्का क्लस्टर में सभी डेटा विभाजन का असंबद्ध संघ है। आने वाले संदेश एक विभाजन के अंत में लिखे गए हैं और संदेश उपभोक्ताओं द्वारा क्रमिक रूप से पढ़े जाते हैं। विभिन्न दलालों को संदेश दोहराकर स्थायित्व प्रदान किया जाता है।

काफ्का तेज, विश्वसनीय, निरंतर, दोष-सहिष्णुता और शून्य डाउनटाइम तरीके से पब-उप और कतार आधारित संदेश प्रणाली प्रदान करता है। दोनों ही मामलों में, निर्माता केवल एक विषय पर संदेश भेजते हैं और उपभोक्ता अपनी आवश्यकता के आधार पर किसी भी प्रकार के संदेश प्रणाली को चुन सकता है। हमें यह समझने के लिए कि उपभोक्ता अपनी पसंद का मैसेजिंग सिस्टम कैसे चुन सकते हैं, अगले भाग में दिए गए चरणों का पालन करें।

पब-सब मैसेजिंग का वर्कफ़्लो

निम्नलिखित पब-सब मैसेजिंग का चरणवार वर्कफ़्लो है -

  • निर्माता नियमित अंतराल पर एक विषय पर संदेश भेजते हैं।

  • Kafka ब्रोकर उस विशेष विषय के लिए कॉन्फ़िगर किए गए विभाजनों में सभी संदेशों को संग्रहीत करता है। यह सुनिश्चित करता है कि संदेश विभाजन के बीच समान रूप से साझा किए गए हैं। यदि निर्माता दो संदेश भेजता है और दो विभाजन होते हैं, तो काफ्का पहले विभाजन में एक संदेश और दूसरे विभाजन में दूसरा संदेश संग्रहीत करेगा।

  • उपभोक्ता एक विशिष्ट विषय की सदस्यता लेता है।

  • एक बार जब उपभोक्ता किसी विषय की सदस्यता ले लेता है, तो काफ्का उपभोक्ता को विषय की वर्तमान ऑफसेट प्रदान करेगा और ज़ुकाइपर पहनावा में ऑफसेट को भी बचाता है।

  • उपभोक्ता नए संदेशों के लिए एक नियमित अंतराल (जैसे 100 एमएस) में काफ्का का अनुरोध करेगा।

  • एक बार काफ्का उत्पादकों से संदेश प्राप्त करता है, यह इन संदेशों को उपभोक्ताओं को अग्रेषित करता है।

  • उपभोक्ता संदेश प्राप्त करेगा और इसे संसाधित करेगा।

  • संदेशों के संसाधित होने के बाद, उपभोक्ता काफ्का दलाल को एक पावती भेजेगा।

  • एक बार जब काफ्का को एक पावती मिल जाती है, तो वह ऑफसेट को नए मूल्य में बदल देता है और इसे ज़ूकीपर में अपडेट करता है। चूंकि ज़ुकाइटर में ऑफ़सेट बनाए जाते हैं, इसलिए उपभोक्ता सर्वर के दौरान भी अगले संदेश को सही ढंग से पढ़ सकता है।

  • यह उपरोक्त प्रवाह तब तक दोहराएगा जब तक कि उपभोक्ता अनुरोध को रोक नहीं देता है।

  • उपभोक्ता के पास किसी भी समय किसी विषय की वांछित ऑफ़र्स को रिवाइंड / स्किप करने का विकल्प होता है और बाद के सभी संदेशों को पढ़ सकता है।

कतार मैसेजिंग / उपभोक्ता समूह का वर्कफ़्लो

एक एकल उपभोक्ता के बजाय एक कतार संदेश प्रणाली में, एक ही समूह ID वाले उपभोक्ताओं का समूह किसी विषय की सदस्यता लेगा। सरल शब्दों में, एक ही ग्रुप आईडी वाले किसी विषय की सदस्यता लेने वाले उपभोक्ताओं को एक ही समूह माना जाता है और उनके बीच संदेश साझा किए जाते हैं। आइए हम इस प्रणाली के वास्तविक वर्कफ़्लो की जाँच करें।

  • निर्माता एक विषय के लिए एक नियमित अंतराल में संदेश भेजते हैं।

  • कफ़्का उस विशेष विषय के लिए पहले के परिदृश्य के समान कॉन्फ़िगर किए गए विभाजनों में सभी संदेशों को संग्रहीत करता है।

  • एक एकल उपभोक्ता एक विशिष्ट विषय की सदस्यता लेता है, समूह -1 के रूप में ग्रुप आईडी के साथ टॉपिक -01 ग्रहण करता है ।

  • कफ़्का उपभोक्ता के साथ उसी तरह से बातचीत करता है जैसे पब-सब मैसेजिंग जब तक नया उपभोक्ता एक ही विषय, टॉपिक -01 उसी ग्रुप आईडी के साथ ग्रुप -1 के रूप में सदस्यता नहीं लेता है ।

  • एक बार जब नया उपभोक्ता आता है, तो काफ्का अपने ऑपरेशन को साझा करने के लिए स्विच करता है और दोनों उपभोक्ताओं के बीच डेटा साझा करता है। यह बंटवारा तब तक चलेगा जब तक कि उस विशेष विषय के लिए कॉन्फ़िगर किए गए विभाजन की संख्या तक पहुँचने वाले की संख्या नहीं हो जाती।

  • एक बार जब उपभोक्ता की संख्या विभाजन से अधिक हो जाती है, तो नया उपभोक्ता तब तक कोई और संदेश प्राप्त नहीं करेगा, जब तक कि कोई भी मौजूदा उपभोक्ता अनिर्णायक न हो। यह परिदृश्य उत्पन्न होता है क्योंकि काफ्का में प्रत्येक उपभोक्ता को एक विभाजन का एक न्यूनतम सौंपा जाएगा और एक बार सभी विभाजन मौजूदा उपभोक्ताओं को सौंपे जाने के बाद, नए उपभोक्ताओं को इंतजार करना होगा।

  • इस सुविधा को उपभोक्ता समूह भी कहा जाता है । उसी तरह, काफ्का दोनों प्रणालियों को बहुत सरल और कुशल तरीके से प्रदान करेगा।

चिड़ियाघर कीपर की भूमिका

Apache Kafka की एक महत्वपूर्ण निर्भरता Apache Zookeeper है, जो एक वितरित कॉन्फ़िगरेशन और सिंक्रनाइज़ेशन सेवा है। ज़ूकीफ़र काफ्का दलालों और उपभोक्ताओं के बीच समन्वय इंटरफ़ेस के रूप में कार्य करता है। काफ्का सर्वर एक ज़ुकीपर क्लस्टर के माध्यम से जानकारी साझा करता है। कफ़का ज़ूकीपर में बुनियादी मेटाडेटा संग्रहीत करता है जैसे कि विषयों, दलालों, उपभोक्ता ऑफसेट (कतार पाठकों) के बारे में जानकारी और इतने पर।

चूंकि सभी महत्वपूर्ण जानकारी ज़ुकाइपर में संग्रहीत होती है और यह सामान्य रूप से इस डेटा को अपने पहनावा में भरती है, काफ्का ब्रोकर / ज़ूकीपर की विफलता काफ्का क्लस्टर की स्थिति को प्रभावित नहीं करती है। एक बार ज़ूकीर के पुनरारंभ होने पर, काफ्का राज्य को बहाल करेगा। यह काफ्का के लिए शून्य डाउनटाइम देता है। कफ़्का दलाल के बीच नेता का चुनाव नेता की विफलता की स्थिति में ज़ुकीपर का उपयोग करके भी किया जाता है।

ज़ुकीपर के बारे में अधिक जानने के लिए, ज़ूकीपर को देखें

हमें अगले अध्याय में अपनी मशीन पर जावा, ज़ूकिपर, और काफ्का स्थापित करने के तरीके पर आगे जारी रखना चाहिए।

अपनी मशीन पर जावा को स्थापित करने के चरण निम्नलिखित हैं।

चरण 1 - जावा स्थापना का सत्यापन

उम्मीद है कि आपने अभी-अभी अपनी मशीन पर जावा को स्थापित किया है, इसलिए आप केवल निम्न कमांड का उपयोग करके इसे सत्यापित करते हैं।

$ java -version

यदि जावा आपकी मशीन पर सफलतापूर्वक स्थापित है, तो आप स्थापित जावा का संस्करण देख सकते हैं।

चरण 1.1 - JDK डाउनलोड करें

यदि जावा डाउनलोड नहीं किया गया है, तो कृपया निम्न लिंक पर जाकर JDK का नवीनतम संस्करण डाउनलोड करें और नवीनतम संस्करण डाउनलोड करें।

http://www.oracle.com/technetwork/java/javase/downloads/index.html

अब नवीनतम संस्करण JDK 8u 60 है और फ़ाइल "jdk-8u60-linux-x64.tar.gz" है। कृपया अपनी मशीन पर फ़ाइल डाउनलोड करें।

चरण 1.2 - फ़ाइलें निकालें

आम तौर पर, डाउनलोड की जा रही फ़ाइलों को डाउनलोड फ़ोल्डर में संग्रहीत किया जाता है, इसे सत्यापित करें और निम्न आदेशों का उपयोग करके टार सेटअप निकालें।

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

चरण 1.3 - ऑप्ट निर्देशिका में ले जाएँ

जावा को सभी उपयोगकर्ताओं के लिए उपलब्ध कराने के लिए, निकाले गए जावा सामग्री को usr / स्थानीय / जावा / फ़ोल्डर में ले जाएँ।

$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/

चरण १.४ - पथ निर्धारित करें

पथ और JAVA_HOME चर सेट करने के लिए, ~ / .bashrc फ़ाइल में निम्न कमांड जोड़ें।

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

अब सभी परिवर्तनों को वर्तमान चल रही प्रणाली में लागू करें।

$ source ~/.bashrc

चरण 1.5 - जावा अल्टरनेटिव

जावा अल्टरनेटिव्स को बदलने के लिए निम्न कमांड का उपयोग करें।

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6 - अब चरण 1 में बताई गई सत्यापन कमांड (जावा-वर्सन) का उपयोग करके जावा को सत्यापित करें।

चरण 2 - चिड़ियाघर कीपर फ्रेमवर्क स्थापना

चरण 2.1 - चिड़ियाघरकीपर डाउनलोड करें

अपनी मशीन पर चिड़ियाघरकीपर फ्रेमवर्क स्थापित करने के लिए, निम्न लिंक पर जाएं और चिड़ियाघरकीपर का नवीनतम संस्करण डाउनलोड करें।

http://zookeeper.apache.org/releases.html

अब तक, ज़ूकीपर का नवीनतम संस्करण 3.4.6 (ज़ूकीपर-3.4.6.tar.gz) है।

चरण 2.2 - टार फ़ाइल निकालें

निम्नलिखित कमांड का उपयोग करके टार फाइल को निकालें

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data

चरण 2.3 - कॉन्फ़िगरेशन फ़ाइल बनाएँ

Open कॉन्फ़िगरेशन फ़ाइल जिसका नाम vi / zoo.cfg है , कमांड vi "conf / zoo.cfg" और शुरुआती बिंदु के रूप में सेट करने के लिए निम्नलिखित सभी मापदंडों का उपयोग करता है।

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

एक बार कॉन्फ़िगरेशन फ़ाइल सफलतापूर्वक सहेज ली गई है और फिर से टर्मिनल पर वापस आ जाती है, तो आप ज़ूकीपर सर्वर शुरू कर सकते हैं।

चरण 2.4 - ज़ूकीपर सर्वर प्रारंभ करें

$ bin/zkServer.sh start

इस कमांड को निष्पादित करने के बाद, आपको नीचे दी गई प्रतिक्रिया मिल जाएगी -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED

चरण 2.5 - सीएलआई शुरू करें

$ bin/zkCli.sh

उपरोक्त कमांड टाइप करने के बाद, आप ज़ूकीपर सर्वर से जुड़ जाएंगे और नीचे की प्रतिक्रिया प्राप्त करेंगे।

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

चरण 2.6 - ज़ूकीपर सर्वर बंद करो

सर्वर से कनेक्ट करने और सभी ऑपरेशन करने के बाद, आप zookeeper सर्वर को निम्न कमांड के साथ रोक सकते हैं -

$ bin/zkServer.sh stop

अब आपने अपनी मशीन पर Java और ZooKeeper सफलतापूर्वक स्थापित कर लिया है। आइये देखते हैं अपाचे काफ्का को स्थापित करने के चरण।

चरण 3 - अपाचे काफ्का स्थापना

हमें अपनी मशीन पर काफ्का स्थापित करने के लिए निम्नलिखित चरणों के साथ जारी रखें।

चरण 3.1 - काफ्का डाउनलोड करें

अपनी मशीन पर काफ्का स्थापित करने के लिए, नीचे दिए गए लिंक पर क्लिक करें -

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

अब नवीनतम संस्करण है, - kafka_2.11_0.9.0.0.tgz आपकी मशीन पर डाउनलोड किया जाएगा।

चरण 3.2 - टार फ़ाइल को निकालें

निम्नलिखित कमांड का उपयोग करके टार फाइल को निकालें -

$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

अब आपने अपनी मशीन पर काफ्का का नवीनतम संस्करण डाउनलोड किया है।

चरण 3.3 - सर्वर प्रारंभ करें

आप निम्नलिखित कमांड देकर सर्वर शुरू कर सकते हैं -

$ bin/kafka-server-start.sh config/server.properties

सर्वर शुरू होने के बाद, आप अपनी स्क्रीन पर नीचे की प्रतिक्रिया देखेंगे -

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

चरण 4 - सर्वर बंद करो

सभी ऑपरेशन करने के बाद, आप निम्न कमांड का उपयोग करके सर्वर को बंद कर सकते हैं -

$ bin/kafka-server-stop.sh config/server.properties

अब जब हम पहले ही काफ्का स्थापना पर चर्चा कर चुके हैं, तो हम अगले अध्याय में काफ्का पर बुनियादी संचालन करने का तरीका सीख सकते हैं।

पहले हमें सिंगल नोड-सिंगल ब्रोकर कॉन्फ़िगरेशन लागू करना शुरू करते हैं और फिर हम अपने सेटअप को सिंगल नोड-मल्टीपल ब्रोकर्स कॉन्फ़िगरेशन में माइग्रेट करेंगे।

उम्मीद है कि आपने अब तक अपनी मशीन पर जावा, ज़ूकीपर और काफ्का स्थापित किया होगा। काफ्का क्लस्टर सेटअप में जाने से पहले, आपको अपना चिड़ियाघर कीपर शुरू करना होगा क्योंकि काफ्का क्लस्टर ज़ूकीपर का उपयोग करता है।

ZooKeeper प्रारंभ करें

एक नया टर्मिनल खोलें और निम्न कमांड टाइप करें -

bin/zookeeper-server-start.sh config/zookeeper.properties

काफ्का ब्रोकर शुरू करने के लिए, निम्न कमांड टाइप करें -

bin/kafka-server-start.sh config/server.properties

काफ्का ब्रोकर शुरू करने के बाद, ज़ूकेपर टर्मिनल पर कमांड जेपी टाइप करें और आपको निम्नलिखित प्रतिक्रिया दिखाई देगी -

821 QuorumPeerMain
928 Kafka
931 Jps

अब आप टर्मिनल पर चलने वाले दो डेमन को देख सकते हैं जहाँ क्वोरमपेरमैन ज़ूकेर डेमन है और दूसरा काफ्का डेमन है।

सिंगल नोड-सिंगल ब्रोकर कॉन्फ़िगरेशन

इस कॉन्फ़िगरेशन में आपके पास एक एकल ज़ूकीपर और ब्रोकर आईडी उदाहरण है। इसे कॉन्फ़िगर करने के चरण निम्नलिखित हैं -

Creating a Kafka Topic- Kafka सर्वर पर विषय बनाने के लिए kafka-topics.sh नाम की एक कमांड लाइन उपयोगिता प्रदान करता है । नया टर्मिनल खोलें और नीचे का उदाहरण लिखें।

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

हमने केवल हैलो-काफ्का नामक एक एकल विभाजन और एक प्रतिकृति कारक के साथ एक विषय बनाया । उपरोक्त निर्मित आउटपुट निम्न आउटपुट के समान होगा -

Output- हैलो-काफ्का विषय बनाया

विषय बन जाने के बाद, आप काफ्का ब्रोकर टर्मिनल विंडो में नोटिफिकेशन प्राप्त कर सकते हैं और “/ tmp / kafka-लॉग /” में निर्दिष्ट विषय के लिए लॉग इन कर सकते हैं।

विषयों की सूची

काफ्का सर्वर में विषयों की एक सूची प्राप्त करने के लिए, आप निम्नलिखित कमांड का उपयोग कर सकते हैं -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

चूंकि हमने एक विषय बनाया है, यह केवल हैलो-काफ्का को सूचीबद्ध करेगा । मान लीजिए, यदि आप एक से अधिक विषय बनाते हैं, तो आपको विषय के नाम आउटपुट में मिलेंगे।

संदेश भेजने के लिए निर्माता शुरू करें

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

उपरोक्त वाक्य रचना से, निर्माता कमांड लाइन क्लाइंट के लिए दो मुख्य मापदंडों की आवश्यकता होती है -

Broker-list- दलालों की सूची जिसे हम संदेश भेजना चाहते हैं। इस मामले में हमारे पास केवल एक दलाल है। Config / server.properties फ़ाइल में ब्रोकर पोर्ट आईडी है, क्योंकि हम जानते हैं कि हमारा ब्रोकर पोर्ट 9092 पर सुन रहा है, इसलिए आप इसे सीधे निर्दिष्ट कर सकते हैं।

विषय नाम - यहाँ विषय के नाम के लिए एक उदाहरण है।

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

निर्माता स्टड से इनपुट पर इंतजार करेगा और काफ्का क्लस्टर में प्रकाशित करेगा। डिफ़ॉल्ट रूप से, प्रत्येक नई पंक्ति को एक नए संदेश के रूप में प्रकाशित किया जाता है, फिर डिफ़ॉल्ट निर्माता गुण कॉन्फ़िगर / निर्माता में निर्दिष्ट होते हैं । अब आप टर्मिनल में कुछ संदेश टाइप कर सकते हैं जैसा कि नीचे दिखाया गया है।

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

संदेश प्राप्त करने के लिए उपभोक्ता शुरू करें

निर्माता के समान, डिफ़ॉल्ट उपभोक्ता गुण config / Consumer.proper-संबंधों फ़ाइल में निर्दिष्ट हैं । एक नया टर्मिनल खोलें और संदेशों का उपभोग करने के लिए नीचे दिए गए सिंटैक्स टाइप करें।

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

अंत में, आप निर्माता के टर्मिनल से संदेश दर्ज कर सकते हैं और उन्हें उपभोक्ता के टर्मिनल में प्रदर्शित होते हुए देख सकते हैं। अब तक, आपके पास एकल ब्रोकर के साथ एकल नोड क्लस्टर पर बहुत अच्छी समझ है। आइए अब हम कई दलालों के कॉन्फ़िगरेशन पर चलते हैं।

सिंगल नोड-मल्टीपल ब्रोकर्स कॉन्फ़िगरेशन

कई ब्रोकर्स क्लस्टर सेटअप पर जाने से पहले, पहले अपना चिड़ियाघरकीपर सर्वर शुरू करें।

Create Multiple Kafka Brokers- हमारे पास पहले से ही con-fig / server.properties में एक काफ्का ब्रोकर उदाहरण है। अब हमें कई ब्रोकर इंस्टेंसेस की आवश्यकता है, इसलिए मौजूदा server.prop-erties फाइल को दो नई कॉन्फिग फाइल में कॉपी करें और इसे server-one.properties और server-two.prop-erties नाम दें। फिर दोनों नई फ़ाइलों को संपादित करें और निम्नलिखित परिवर्तन असाइन करें -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- तीन सर्वरों पर सभी परिवर्तन किए जाने के बाद, प्रत्येक ब्रोकर को एक-एक करके शुरू करने के लिए तीन नए टर्मिनल खोलें।

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

अब हमारे पास मशीन पर चलने वाले तीन अलग-अलग दलाल हैं। टाइप करके सभी डेमों की जांच करने के लिए अपने आप से प्रयास करेंjps चिड़ियाघरकीपर टर्मिनल पर, फिर आपको प्रतिक्रिया दिखाई देगी।

एक विषय बनाना

आइए हम इस विषय के लिए तीन के रूप में प्रतिकृति कारक मान प्रदान करें क्योंकि हमारे पास तीन अलग-अलग दलाल चल रहे हैं। यदि आपके पास दो दलाल हैं, तो निर्दिष्ट प्रतिकृति मान दो होगा।

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

वर्णन करें आदेश जो दलाल वर्तमान बनाई विषय पर सुन रहा है जैसा कि नीचे दिखाया जाँच करने के लिए प्रयोग किया जाता है -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

उपरोक्त आउटपुट से, हम यह निष्कर्ष निकाल सकते हैं कि पहली पंक्ति में सभी विभाजनों का सारांश दिया गया है, जो विषय का नाम, विभाजन गणना और प्रतिकृति कारक है जो हमने पहले ही चुना है। दूसरी पंक्ति में, प्रत्येक नोड विभाजन के एक बेतरतीब ढंग से चयनित भाग के लिए नेता होगा।

हमारे मामले में, हम देखते हैं कि हमारा पहला ब्रोकर (दलाल 0 के साथ) नेता है। तब प्रतिकृतियां: 0,2,1 का मतलब है कि सभी दलालों विषय को दोहराने अंत में Isr का सेट है में-सिंक प्रतिकृतियां। खैर, यह प्रतिकृतियों का सबसेट है जो वर्तमान में जीवित हैं और नेता द्वारा पकड़ा जाता है।

संदेश भेजने के लिए निर्माता शुरू करें

यह प्रक्रिया एकल ब्रोकर सेटअप की तरह ही रहती है।

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

संदेश प्राप्त करने के लिए उपभोक्ता शुरू करें

यह प्रक्रिया वैसी ही रहती है जैसी सिंगल ब्रोकर सेटअप में दिखाई जाती है।

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

मूल विषय संचालन

इस अध्याय में हम विभिन्न बुनियादी विषय संचालन पर चर्चा करेंगे।

एक विषय को संशोधित करना

जैसा कि आप पहले ही समझ गए हैं कि काफ्का क्लस्टर में एक विषय कैसे बनाया जाए। अब हम निम्न कमांड का उपयोग करके एक निर्मित विषय को संशोधित करते हैं

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

एक विषय हटाना

किसी विषय को हटाने के लिए, आप निम्न सिंटैक्स का उपयोग कर सकते हैं।

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −इसका कोई प्रभाव नहीं पड़ेगा अगर delete.topic.enable सत्य पर सेट नहीं है

जावा क्लाइंट का उपयोग करके संदेशों को प्रकाशित करने और उपभोग करने के लिए एक एप्लिकेशन बनाएं। कफका उत्पादक ग्राहक में निम्नलिखित एपीआई के होते हैं।

काफ्काप्रोड्यूसर एपीआई

आइए इस खंड में काफ्का निर्माता एपीआई के सबसे महत्वपूर्ण सेट को समझते हैं। KafkaProducer API का मध्य भाग KafkaProducer वर्ग है। KafkaProducer वर्ग निम्न विधियों के साथ अपने निर्माता में एक Kafka दलाल को जोड़ने का विकल्प प्रदान करता है।

  • KafkaProducer वर्ग किसी विषय पर अतुल्यकालिक संदेश भेजने के लिए विधि भेजता है। भेजें () का हस्ताक्षर इस प्रकार है

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - निर्माता भेजे जाने के लिए रिकॉर्ड के एक बफर का प्रबंधन करता है।

  • Callback - जब सर्वर द्वारा रिकॉर्ड को स्वीकार कर लिया गया है तो सर्वर निष्पादित करने के लिए उपयोगकर्ता को कॉलबैक निष्पादित करता है (अशक्त कोई कॉलबैक इंगित करता है)।

  • KafkaProducer वर्ग वास्तव में पूरा हो गया है पहले से भेजे गए संदेशों को सुनिश्चित करने के लिए एक फ्लश विधि प्रदान करता है। फ्लश विधि का सिंटैक्स इस प्रकार है -

public void flush()
  • काफ्काप्रोड्यूसर वर्ग पार्टीशन विधि प्रदान करता है, जो किसी दिए गए विषय के लिए विभाजन मेटाडेटा प्राप्त करने में मदद करता है। यह कस्टम विभाजन के लिए उपयोग किया जा सकता है। इस विधि का हस्ताक्षर इस प्रकार है -

public Map metrics()

यह निर्माता द्वारा बनाए गए आंतरिक मैट्रिक्स का नक्शा लौटाता है।

  • सार्वजनिक शून्य बंद () - KafkaProducer वर्ग सभी पहले भेजे गए अनुरोधों को पूरा होने तक करीब विधि ब्लॉक प्रदान करता है।

निर्माता एपीआई

निर्माता एपीआई का मध्य भाग निर्माता वर्ग है। निर्माता वर्ग निम्नलिखित तरीकों से अपने निर्माता में काफ्का दलाल को जोड़ने का विकल्प प्रदान करता है।

निर्माता वर्ग

निर्माता वर्ग को भेजने की विधि प्रदान करता है send निम्नलिखित हस्ताक्षर का उपयोग करके या तो एकल या कई विषयों के लिए संदेश।

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

निर्माता दो प्रकार के होते हैं - Sync तथा Async

समान API कॉन्फ़िगरेशन सिंक निर्माता पर भी लागू होता है । उनके बीच का अंतर एक सिंक निर्माता है जो सीधे संदेश भेजता है, लेकिन पृष्ठभूमि में संदेश भेजता है। जब आप उच्च थ्रूपुट चाहते हैं, तो Async निर्माता को प्राथमिकता दी जाती है। पिछली रिलीज में 0.8 की तरह, एक async निर्माता के पास त्रुटि हैंडलर को पंजीकृत करने के लिए भेजने () के लिए कॉलबैक नहीं है। यह केवल वर्तमान रिलीज़ में 0.9 में उपलब्ध है।

सार्वजनिक शून्य के करीब ()

निर्माता वर्ग प्रदान करता है close सभी काफ्का ब्रो-केर्स के लिए निर्माता पूल कनेक्शन बंद करने की विधि।

कॉन्फ़िगरेशन सेटिंग्स

प्रोड्यूसर एपीआई की मुख्य कॉन्फ़िगरेशन सेटिंग्स बेहतर अंडर-स्टैंडिंग के लिए निम्न तालिका में सूचीबद्ध हैं -

S.No कॉन्फ़िगरेशन सेटिंग्स और विवरण
1

client.id

निर्माता आवेदन की पहचान करता है

2

producer.type

या तो सिंक या async

3

acks

एक्स कॉन्फिग्यूशन निर्माता के अनुरोधों के तहत मानदंड को नियंत्रित करता है जो पूर्ण रूप से को-साइडेड हैं।

4

retries

यदि निर्माता अनुरोध विफल हो जाता है, तो स्वचालित रूप से विशिष्ट मूल्य के साथ पुन: प्रयास करें।

5

bootstrap.servers

दलालों की बूटस्ट्रैपिंग सूची।

6

linger.ms

यदि आप अनुरोधों की संख्या को कम करना चाहते हैं तो आप linger.ms को कुछ मूल्य से अधिक के लिए सेट कर सकते हैं।

7

key.serializer

सीरियल इंटरफ़ेस के लिए कुंजी।

8

value.serializer

धारावाहिक इंटरफ़ेस के लिए मूल्य।

9

batch.size

बफर आकार।

10

buffer.memory

बफ़रिंग के लिए निर्माता को उपलब्ध स्मृति की कुल मात्रा को नियंत्रित करता है।

निर्माता। एपीआई

ProducerRecord एक कुंजी / मान युग्म है, जो निम्न हस्ताक्षर का उपयोग करके विभाजन, कुंजी और मूल्य जोड़े के साथ एक रिकॉर्ड बनाने के लिए Kafka क्लस्टर में भेजा जाता है ।roducerRecord वर्ग निर्माता।

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - उपयोगकर्ता परिभाषित विषय नाम जो रिकॉर्ड करने के लिए संलग्न होगा।

  • Partition - विभाजन की गिनती

  • Key - कुंजी जो रिकॉर्ड में शामिल होगी।

  • Value - रिकॉर्ड सामग्री
public ProducerRecord (string topic, k key, v value)

ProducerRecord क्लास कंस्ट्रक्टर का उपयोग कुंजी, मूल्य जोड़े और विभाजन के बिना रिकॉर्ड बनाने के लिए किया जाता है।

  • Topic - रिकॉर्ड असाइन करने के लिए एक विषय बनाएं।

  • Key - रिकॉर्ड के लिए कुंजी।

  • Value - रिकॉर्ड सामग्री।

public ProducerRecord (string topic, v value)

ProducerRecord वर्ग विभाजन और कुंजी के बिना एक रिकॉर्ड बनाता है।

  • Topic - एक विषय बनाएँ।

  • Value - रिकॉर्ड सामग्री।

ProducerRecord वर्ग विधियों को निम्न तालिका में सूचीबद्ध किया गया है -

S.No क्लास के तरीके और विवरण
1

public string topic()

विषय रिकॉर्ड के लिए अपील करेंगे।

2

public K key()

कुंजी जिसे रिकॉर्ड में शामिल किया जाएगा। यदि ऐसी कोई कुंजी नहीं है, तो नल को फिर से चालू कर दिया जाएगा।

3

public V value()

सामग्री रिकॉर्ड करें।

4

partition()

रिकॉर्ड के लिए विभाजन की गणना

SimpleProducer आवेदन

एप्लिकेशन बनाने से पहले, सबसे पहले ZooKeeper और Kafka ब्रोकर को प्रारंभ करें और फिर create कमांड कमांड का उपयोग करके काफ्का ब्रोकर में अपना विषय बनाएं। उसके बाद Sim-pleProducer.java नाम से एक java class बनाएँ और निम्नलिखित कोडिंग में टाइप करें।

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - एप्लिकेशन को निम्न कमांड का उपयोग करके संकलित किया जा सकता है।

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - एप्लिकेशन को निम्न कमांड का उपयोग करके निष्पादित किया जा सकता है।

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

सरल उपभोक्ता उदाहरण

अब तक हमने काफ्का क्लस्टर को संदेश भेजने के लिए एक निर्माता बनाया है। अब कफका क्लस्टर के रूप में संदेशों का उपभोग करने के लिए एक उपभोक्ता बनाते हैं। KafkaConsumer API का उपयोग काफ्का क्लस्टर के संदेशों का उपभोग करने के लिए किया जाता है। KafkaConsumer वर्ग निर्माता नीचे परिभाषित किया गया है।

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - कंज्यूमर कॉन्फिग का मैप वापस करें।

KafkaConsumer वर्ग में निम्नलिखित महत्वपूर्ण विधियाँ हैं जो नीचे दी गई तालिका में सूचीबद्ध हैं।

S.No विधि और विवरण
1

public java.util.Set<TopicPar-tition> assignment()

वर्तमान में con-sumer द्वारा निर्दिष्ट विभाजन का सेट प्राप्त करें।

2

public string subscription()

डायनामिक रूप से हस्ताक्षरित विभाजन प्राप्त करने के लिए विषयों की दी गई सूची की सदस्यता लें।

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

डायनामिक रूप से हस्ताक्षरित विभाजन प्राप्त करने के लिए विषयों की दी गई सूची की सदस्यता लें।

4

public void unsubscribe()

विभाजन की दी गई सूची से विषयों को अनसब्सक्राइब करें।

5

public void sub-scribe(java.util.List<java.lang.String> topics)

डायनामिक रूप से हस्ताक्षरित विभाजन प्राप्त करने के लिए विषयों की दी गई सूची की सदस्यता लें। यदि विषयों की दी गई सूची खाली है, तो इसे सदस्यता समाप्त () के समान माना जाता है।

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

तर्क पैटर्न नियमित अभिव्यक्ति के प्रारूप में सदस्यता पैटर्न को संदर्भित करता है और श्रोता तर्क को सदस्यता पैटर्न से सूचनाएं मिलती हैं।

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

मैन्युअल रूप से ग्राहक को विभाजन की एक सूची सौंपें।

8

poll()

सब्स्क्राइब / असाइन किए गए API में से किसी एक का उपयोग करके निर्दिष्ट किए गए विषयों या विभाजनों के लिए डेटा प्राप्त करें। यदि डेटा के लिए मतदान से पहले विषयों की सदस्यता नहीं ली जाती है, तो यह त्रुटि लौटाएगा।

9

public void commitSync()

विषयों और विभाजनों की सभी उप-स्क्राइब सूची के लिए पिछले चुनाव () पर कमेटी ऑफसेट लौटी। एक ही ऑपरेशन को कमिशन () के लिए लागू किया जाता है।

10

public void seek(TopicPartition partition, long offset)

वर्तमान ऑफसेट मूल्य प्राप्त करें जो उपभोक्ता अगले पोल () विधि पर उपयोग करेगा।

1 1

public void resume()

रुके हुए विभाजन फिर से शुरू करें।

12

public void wakeup()

उपभोक्ता को जगाएं।

ConsumerRecord API

ConsumerRecord API का उपयोग काफ्का क्लस्टर से रिकॉर्ड प्राप्त करने के लिए किया जाता है। इस एपीआई में एक विषय का नाम, विभाजन संख्या, जिसमें से रिकॉर्ड प्राप्त किया जा रहा है और एक ऑफसेट जो कि काफ्का विभाजन में रिकॉर्ड को इंगित करता है। ConsumerRecord वर्ग का उपयोग विशिष्ट विषय नाम, विभाजन गणना और <कुंजी, मान> जोड़े के साथ एक उपभोक्ता रिकॉर्ड बनाने के लिए किया जाता है। इसके निम्नलिखित हस्ताक्षर हैं।

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - काफ्का क्लस्टर से प्राप्त उपभोक्ता रिकॉर्ड का विषय नाम।

  • Partition - विषय के लिए विभाजन।

  • Key - रिकॉर्ड की कुंजी, यदि कोई कुंजी मौजूद नहीं है तो शून्य वापस कर दी जाएगी।

  • Value - रिकॉर्ड सामग्री।

ConsumerRecords API

ConsumerRecords API ConsumerRecord के लिए एक कंटेनर के रूप में कार्य करता है। इस API का उपयोग किसी विशेष विषय के लिए प्रति विभाजन ConsumerRecord की सूची को रखने के लिए किया जाता है। इसके कंस्ट्रक्टर को नीचे परिभाषित किया गया है।

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - किसी विशेष विषय के लिए विभाजन का नक्शा लौटाएं।

  • Records - ConsumerRecord की सूची लौटाएं।

ConsumerRecords वर्ग में निम्नलिखित तरीके परिभाषित हैं।

S.No तरीके और विवरण
1

public int count()

सभी विषयों के लिए रिकॉर्ड की संख्या।

2

public Set partitions()

इस रिकॉर्ड सेट में डेटा के साथ विभाजन का सेट (यदि कोई डेटा वापस नहीं किया गया था तो सेट खाली है)।

3

public Iterator iterator()

Iterator आपको एक संग्रह के माध्यम से चक्र प्राप्त करने, तत्वों को प्राप्त करने या फिर से स्थानांतरित करने में सक्षम बनाता है।

4

public List records()

दिए गए विभाजन के रिकॉर्ड की सूची प्राप्त करें।

कॉन्फ़िगरेशन सेटिंग्स

उपभोक्ता क्लाइंट API मुख्य कॉन्फ़िगरेशन सेटिंग्स के लिए कॉन्फ़िगरेशन सेटिंग्स नीचे सूचीबद्ध हैं -

S.No सेटिंग्स और विवरण
1

bootstrap.servers

दलालों की बूटस्ट्रैपिंग सूची।

2

group.id

एक समूह के लिए एक व्यक्तिगत उपभोक्ता असाइन करता है।

3

enable.auto.commit

यदि मान सत्य है, तो ऑफ़सेट के लिए ऑटो कमिट सक्षम करें, अन्यथा प्रतिबद्ध नहीं है।

4

auto.commit.interval.ms

वापसी कितनी बार उपभोग किए गए ऑफ़सेट्स ज़ूकीपर को लिखे गए हैं।

5

session.timeout.ms

इंगित करता है कि संदेश देने और जारी रखने के लिए कितने मिलीसेकंड काफ्का ज़ूकेपर के अनुरोध का जवाब देने (पढ़ने या लिखने) का इंतजार करेगा।

SimpleConsumer आवेदन

निर्माता के आवेदन के चरण यहां समान हैं। सबसे पहले, अपने चिड़ियाघरकीपर और काफ्का दलाल को शुरू करें। फिर SimpleCon-sumer.java नाम के जावा वर्ग के साथ एक SimpleConsumer एप्लिकेशन बनाएं और निम्न कोड टाइप करें।

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - एप्लिकेशन को निम्न कमांड का उपयोग करके संकलित किया जा सकता है।

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − एप्लिकेशन को निम्न आदेश का उपयोग करके निष्पादित किया जा सकता है

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- निर्माता CLI खोलें और विषय पर कुछ संदेश भेजें। आप 'हेलो कंज्यूमर' के रूप में स्माइल इनपुट डाल सकते हैं।

Output - निम्नलिखित आउटपुट होगा।

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

उपभोक्ता समूह कफका विषयों से बहु-थ्रेडेड या बहु-मशीन खपत है।

उपभोक्ता समूह

  • उपभोक्ता एक ही group.id का उपयोग करके एक समूह में शामिल हो सकते हैं

  • एक समूह की अधिकतम समानता यह है कि समूह में उपभोक्ताओं की संख्या part विभाजन की संख्या नहीं है।

  • काफ्का एक समूह में उपभोक्ता को एक विषय के विभाजन को असाइन करता है, ताकि प्रत्येक विभाजन समूह में ठीक एक उपभोक्ता द्वारा खपत हो।

  • काफ्का गारंटी देता है कि एक संदेश केवल समूह में किसी एकल उपभोक्ता द्वारा पढ़ा जाता है।

  • उपभोक्ता उस संदेश को देख सकते हैं जिस क्रम में उन्हें लॉग में संग्रहीत किया गया था।

एक उपभोक्ता का फिर से संतुलन

अधिक प्रक्रियाओं / थ्रेड्स को जोड़ने से काफ्का को फिर से संतुलन मिलेगा। यदि कोई भी उपभोक्ता या ब्रोकर दिल की धड़कन को चिड़ियाघर कीपर में भेजने में विफल रहता है, तो इसे काफ्का क्लस्टर के माध्यम से फिर से कॉन्फ़िगर किया जा सकता है। इस पुनः संतुलन के दौरान, काफ्का उपलब्ध थ्रेड्स को उपलब्ध विभाजन प्रदान करेगा, संभवतः एक विभाजन को दूसरी प्रक्रिया में ले जाएगा।

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

संकलन

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

क्रियान्वयन

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

यहां हमने दो उपभोक्ताओं के साथ मेरे समूह के रूप में एक नमूना समूह नाम बनाया है । इसी तरह, आप समूह में अपने समूह और उपभोक्ताओं की संख्या बना सकते हैं।

इनपुट

निर्माता CLI खोलें और कुछ संदेश भेजें -

Test consumer group 01
Test consumer group 02

पहली प्रक्रिया का आउटपुट

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

दूसरी प्रक्रिया का आउटपुट

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

अब उम्मीद है कि आप Java क्लाइंट डेमो का उपयोग करके SimpleConsumer और ConsumeGroup समझ गए होंगे। अब आपके पास जावा क्लाइंट का उपयोग करके संदेश भेजने और प्राप्त करने के तरीके के बारे में एक विचार है। हमें अगले अध्याय में बड़ी डेटा तकनीकों के साथ काफ्का एकीकरण जारी रखना चाहिए।

इस अध्याय में, हम सीखेंगे कि अपाका स्टॉर्म के साथ काफ्का को कैसे एकीकृत किया जाए।

तूफान के बारे में

स्टॉर्म मूल रूप से बैकथन में नाथन मार्ज़ और टीम द्वारा बनाया गया था। थोड़े समय में, अपाचे स्टॉर्म वितरित वास्तविक समय प्रसंस्करण प्रणाली के लिए एक मानक बन गया जो आपको भारी मात्रा में डेटा संसाधित करने की अनुमति देता है। तूफान बहुत तेज़ है और एक बेंचमार्क ने इसे प्रति सेकंड प्रति नोडल संसाधित एक लाख टुपल्स पर देखा। अपाचे स्टॉर्म लगातार चलता है, कॉन्फ़िगर किए गए स्रोतों (स्पाउट्स) से डेटा का उपभोग करता है और प्रोसेसिंग पाइपलाइन (बोल्ट) के नीचे डेटा को पास करता है। कॉम-बाइन्ड, स्पाउट्स और बोल्ट्स एक टोपोलॉजी बनाते हैं।

तूफान के साथ एकीकरण

काफ्का और तूफान स्वाभाविक रूप से एक दूसरे के पूरक हैं, और उनका शक्तिशाली सहयोग तेजी से बढ़ते बड़े डेटा के लिए वास्तविक समय स्ट्रीमिंग एनालिटिक्स को सक्षम बनाता है। काफ्का और स्टॉर्म एकीकरण डेवलपर्स के लिए स्टॉर्म टोपोलॉजी से डेटा धाराओं को निगलना और प्रकाशित करना आसान बनाता है।

वैचारिक प्रवाह

टोंटी धाराओं का एक स्रोत है। उदाहरण के लिए, एक टोंटी एक काफ्का विषय से ट्यूपल्स को पढ़ सकती है और उन्हें एक धारा के रूप में उत्सर्जित कर सकती है। एक बोल्ट इनपुट धाराओं का उपयोग करता है, प्रक्रिया करता है और संभवतः नई धाराएं उत्सर्जित करता है। बोल्ट रनिंग फ़ंक्शंस, ट्यूपल्स को फ़िल्टर करने, स्ट्रीमिंग एग्रीगेशन, स्ट्रीमिंग जॉइनिंग, डेटाबेस से बात करने, और बहुत कुछ कर सकते हैं। एक तूफान टोपोलॉजी में प्रत्येक नोड समानांतर में निष्पादित होता है। एक टोपोलॉजी अनिश्चित काल तक चलती है जब तक आप इसे समाप्त नहीं करते। तूफान स्वचालित रूप से किसी भी असफल कार्यों को फिर से असाइन करेगा। इसके अतिरिक्त, तूफान गारंटी देता है कि कोई भी डेटा हानि नहीं होगी, भले ही मशीनें नीचे जाएं और संदेश गिराए जाएं।

आइए काफ्का-तूफान एकीकरण एपीआई के बारे में विस्तार से जानते हैं। स्टाफ़ के साथ काफ्का को एकीकृत करने के लिए तीन मुख्य वर्ग हैं। वे इस प्रकार हैं -

ब्रोकरहॉस्ट्स - ZkHosts & StaticHosts

ब्रोकरहोस्ट एक इंटरफ़ेस है और ZkHosts और StaticHosts इसके दो मुख्य कार्यान्वयन हैं। ZkHosts का उपयोग ज़ूकेर में विवरणों को बनाए रखने के द्वारा गतिशील रूप से काफ्का दलालों को ट्रैक करने के लिए किया जाता है, जबकि StaticHosts का उपयोग काफ्का दलालों और इसके विवरणों को मैन्युअल रूप से सेट करने के लिए किया जाता है। ZkHosts काफ्का ब्रोकर तक पहुंचने का सरल और तेज़ तरीका है।

ZkHosts का हस्ताक्षर इस प्रकार है -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

जहाँ ब्रोकरZStStr ज़ूकेर होस्ट है और ब्रोकरज़कपैथ काफ्का ब्रोकर विवरण को बनाए रखने के लिए ज़ूकीपर पथ है।

काफ्काकोफिग एपीआई

कफका क्लस्टर के लिए कॉन्फ़िगरेशन सेटिंग्स को परिभाषित करने के लिए इस एपीआई का उपयोग किया जाता है। काफ्का कॉन-अंजीर के हस्ताक्षर को निम्नानुसार परिभाषित किया गया है

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - ब्रोकरहोस्ट्स ZkHosts / StaticHosts हो सकते हैं।

    Topic - विषय का नाम।

SpoutConfig API

Spoutconfig KafkaConfig का एक विस्तार है जो अतिरिक्त ZooKeeper जानकारी का समर्थन करता है।

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - ब्रोकरहॉस्ट्स ब्रोकरहॉस्ट्स इंटरफ़ेस का कोई भी कार्यान्वयन हो सकता है

  • Topic - विषय का नाम।

  • zkRoot - चिड़ियाघर कीपर रूट।

  • id −टोंटी ने ज़ुकाइपर में अपनी खपत के आधार पर राज्य को संग्रहीत किया। आईडी को विशिष्ट रूप से आपके टोंटी की पहचान करनी चाहिए।

SchemeAsMultiScheme

स्कीमएम्स मल्तिसेमे एक इंटरफ़ेस है जो यह बताता है कि कफ़्का से प्राप्त बाइटबफ़र कैसे तूफान तूफान में तब्दील हो जाता है। यह MultiScheme से लिया गया है और योजना वर्ग के कार्यान्वयन को स्वीकार करता है। स्कीम वर्ग के कार्यान्वयन के बहुत सारे हैं और एक ऐसा कार्यान्वयन है स्ट्रिंगस्चेम, जो एक साधारण स्ट्रिंग के रूप में बाइट को पार्स करता है। यह आपके आउटपुट फ़ील्ड के नामकरण को भी नियंत्रित करता है। हस्ताक्षर को निम्नानुसार परिभाषित किया गया है।

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - कफका से भस्म बफर।

KafkaSpout एपीआई

KafkaSpout हमारा टोंटी कार्यान्वयन है, जो स्टॉर्म के साथ एकीकृत होगा। यह काफ्का विषय से मेस-ऋषि प्राप्त करता है और इसे तूफान के रूप में तूफान पारिस्थितिकी तंत्र में फेंक देता है। KafkaSpout को SpoutConfig से इसके विन्यास-विवरण का विवरण मिलता है।

नीचे एक साधारण काफ्का टोंटी बनाने के लिए एक नमूना कोड है।

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

बोल्ट निर्माण

बोल्ट एक घटक है जो ट्यूपल्स को इनपुट के रूप में लेता है, टपल को संसाधित करता है, और आउटपुट के रूप में नए ट्यूपल का उत्पादन करता है। बोल्ट IRichBolt इंटरफ़ेस को लागू करेंगे। इस कार्यक्रम में, दो बोल्ट कक्षाएं WordSplitter-Bolt और WordCounterBolt का उपयोग ऑपरेशन करने के लिए किया जाता है।

IRichBolt इंटरफ़ेस के निम्नलिखित तरीके हैं -

  • Prepare- निष्पादित करने के लिए पर्यावरण के साथ बोल्ट प्रदान करता है। निष्पादक इस पद्धति को टोंटी को आरंभ करने के लिए चलाएगा।

  • Execute - इनपुट के एकल टपल की प्रक्रिया करें।

  • Cleanup - जब एक बोल्ट बंद होने जा रहा है तो कॉल किया गया।

  • declareOutputFields - टपल के आउटपुट स्कीमा की घोषणा करता है।

स्प्लिटबोल्ट.जवा बनाते हैं, जो शब्दों में वाक्य को विभाजित करने के लिए तर्क को लागू करता है और काउंटबोल्ट.जवा, जो अद्वितीय शब्दों को अलग करने के लिए तर्क को लागू करता है और इसकी घटना को गिनता है।

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

टोपोलॉजी के लिए प्रस्तुत करना

स्टॉर्म टोपोलॉजी मूल रूप से एक थ्रिफ्ट संरचना है। TopologyBuilder वर्ग जटिल टोपोलॉजी बनाने के लिए सरल और आसान तरीके प्रदान करता है। TopologyBuilder वर्ग में टोंटी (सेटस्पाउट) और बोल्ट (सेटबोल्ट) सेट करने की विधियाँ हैं। अंत में, टोपोलॉजीब्युल्टोलॉजी ने टोपोलॉजी को क्रियोलॉजी बनाने के लिए बनाया है। शफ़लग्रुपिंग और फ़ील्ड्सग्रुपिंग विधियाँ टोंटी और बोल्ट के लिए धारा समूह निर्धारण में मदद करती हैं।

Local Cluster- विकास के उद्देश्यों के लिए, हम LocalCluster ऑब्जेक्ट का उपयोग करके एक स्थानीय क्लस्टर बना सकते हैं और फिर LocalCluster वर्ग की सबमिटटॉपोलॉजी पद्धति का उपयोग करके टोपोलॉजी जमा कर सकते हैं

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

संकलन को आगे बढ़ाने से पहले, काका-स्टॉर्म एकीकरण को क्यूरेटर ज़ूकिपर क्लाइंट जावा लाइब्रेरी की आवश्यकता है। क्यूरेटर संस्करण 2.9.1 समर्थन अपाचे स्टॉर्म संस्करण 0.9.5 (जो हम इस ट्यूटोरियल में उपयोग करते हैं)। नीचे निर्दिष्ट जार फ़ाइलों को डाउन-लोड करें और इसे जावा क्लास पथ में रखें।

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

निर्भरता फ़ाइलों को शामिल करने के बाद, निम्नलिखित कमांड का उपयोग करके प्रोग्राम को संकलित करें,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

क्रियान्वयन

काफ्का निर्माता सीएलआई (पिछले अध्याय में समझाया गया) शुरू करें, मेरा पहला-विषय नामक एक नया विषय बनाएं और नीचे दिखाए गए कुछ नमूना संदेश प्रदान करें -

hello
kafka
storm
spark
test message
another test message

अब निम्नलिखित कमांड का उपयोग करके एप्लिकेशन को निष्पादित करें -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

इस एप्लिकेशन का नमूना आउटपुट नीचे निर्दिष्ट किया गया है -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

इस अध्याय में, हम स्पार्क स्ट्रीमिंग एपीआई के साथ अपाचे काफ्का को एकीकृत करने के तरीके के बारे में चर्चा करेंगे।

स्पार्क के बारे में

स्पार्क स्ट्रीमिंग एपीआई स्केलेबल, उच्च-थ्रूपुट, लाइव डेटा धाराओं के दोष-सहिष्णु स्ट्रीम प्रसंस्करण को सक्षम करता है। डेटा को काफ्का, फ्लूम, ट्विटर इत्यादि जैसे कई स्रोतों से प्राप्त किया जा सकता है, और जटिल एल्गोरिदम जैसे मानचित्र, कम, जुड़ने और खिड़की जैसे उच्च-स्तरीय कार्यों का उपयोग करके संसाधित किया जा सकता है। अंत में, संसाधित डेटा को फाइल सिस्टम, डेटाबेस और लाइव डैश-बोर्ड पर धकेल दिया जा सकता है। रेसिलिएंट डिस्ट्रिब्यूटेड डेटासेट्स (आरडीडी) स्पार्क की एक मूलभूत डेटा संरचना है। यह वस्तुओं का एक अपरिवर्तित वितरित संग्रह है। RDD में प्रत्येक डेटासेट को तार्किक विभाजन में विभाजित किया गया है, जिसे क्लस्टर के विभिन्न नोड्स पर गणना की जा सकती है।

स्पार्क के साथ एकीकरण

काफ्का स्पार्क स्ट्रीमिंग के लिए एक संभावित संदेश और एकीकरण मंच है। कफका डेटा की वास्तविक समय की धाराओं के लिए केंद्रीय केंद्र के रूप में कार्य करता है और स्पार्क स्ट्रीमिंग में जटिल एल्गोरिदम का उपयोग करके संसाधित किया जाता है। एक बार डेटा संसाधित होने के बाद, स्पार्क स्ट्रीमिंग एचडीएफएस, डेटाबेस या डैशबोर्ड में अभी तक किसी अन्य काफ़्का विषय या स्टोर में परिणाम प्रकाशित कर सकती है। निम्नलिखित चित्र में वैचारिक प्रवाह को दर्शाया गया है।

अब, हम काफ्का-स्पार्क एपीआई के बारे में विस्तार से जानते हैं।

स्पार्ककोन एपीआई

यह स्पार्क एप्लिकेशन के लिए कॉन्फ़िगरेशन का प्रतिनिधित्व करता है। विभिन्न स्पार्क मापदंडों को कुंजी-मूल्य जोड़े के रूप में सेट करने के लिए उपयोग किया जाता है।

SparkConf वर्ग के निम्नलिखित तरीके हैं -

  • set(string key, string value) - कॉन्फ़िगरेशन चर सेट करें।

  • remove(string key) - कॉन्फ़िगरेशन से कुंजी निकालें।

  • setAppName(string name) - अपने आवेदन के लिए आवेदन नाम सेट करें।

  • get(string key) - कुंजी प्राप्त करें

StreamingContext API

यह स्पार्क कार्यक्षमता के लिए मुख्य प्रवेश बिंदु है। SparkContext एक स्पार्क क्लस्टर से कनेक्शन का प्रतिनिधित्व करता है, और इसका उपयोग क्लस्टर पर RDDs, संचायक और प्रसारण चर बनाने के लिए किया जा सकता है। हस्ताक्षर को नीचे दिखाए अनुसार परिभाषित किया गया है।

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - कनेक्ट करने के लिए क्लस्टर URL (जैसे मेसोस: // होस्ट: पोर्ट, स्पार्क: // होस्ट: पोर्ट, लोकल [4])।

  • appName - क्लस्टर वेब UI पर प्रदर्शित करने के लिए आपकी नौकरी का एक नाम

  • batchDuration - समय अंतराल जिस पर स्ट्रीमिंग डेटा को बैचों में विभाजित किया जाएगा

public StreamingContext(SparkConf conf, Duration batchDuration)

नई स्पार्ककॉन्टेक्ट के लिए आवश्यक कॉन्फ़िगरेशन प्रदान करके एक स्ट्रीमिंग कॉन्टेक्स्ट बनाएं।

  • conf - स्पार्क पैरामीटर

  • batchDuration - समय अंतराल जिस पर स्ट्रीमिंग डेटा को बैचों में विभाजित किया जाएगा

काफ्काटिल्स एपीआई

KafkaUtils API का उपयोग काफ्का क्लस्टर को स्पार्क स्ट्रीमिंग से जोड़ने के लिए किया जाता है। इस एपीआई में नीचे की तरह परिभाषित साइन -कैंट विधि createStream हस्ताक्षर है।

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

ऊपर दी गई विधि का उपयोग एक इनपुट स्ट्रीम बनाने के लिए किया जाता है जो काफ्का ब्रोकर्स के संदेशों को खींचता है।

  • ssc - StreamingContext ऑब्जेक्ट।

  • zkQuorum - ज़ूकीपर कोरम।

  • groupId - इस उपभोक्ता के लिए ग्रुप आई.डी.

  • topics - उपभोग करने के लिए विषयों का एक नक्शा लौटाएं।

  • storageLevel - प्राप्त वस्तुओं के भंडारण के लिए उपयोग करने के लिए संग्रहण स्तर।

KafkaUtils API के पास एक और तरीका createDirectStream है, जिसका उपयोग एक इनपुट स्ट्रीम बनाने के लिए किया जाता है जो किसी भी रिसीवर का उपयोग किए बिना सीधे Kafka Brokers के संदेशों को खींचता है। यह धारा इस बात की गारंटी दे सकती है कि काफ़्का का प्रत्येक संदेश बिल्कुल एक बार परिवर्तनों में शामिल है।

नमूना आवेदन स्काला में किया जाता है। एप्लिकेशन को संकलित करने के लिए, कृपया sbt , scala बिल्ड टूल ( मावेन के समान) डाउनलोड और इंस्टॉल करें । मुख्य आवेदन कोड नीचे प्रस्तुत किया गया है।

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

स्क्रिप्ट बनाएँ

स्पार्क-काफ्का एकीकरण स्पार्क, स्पार्क स्ट्रीमिंग और स्पार्क कफका एकीकरण जार पर निर्भर करता है। एक नई फ़ाइल build.sbt बनाएँ और एप्लिकेशन विवरण और उसकी निर्भरता निर्दिष्ट करें। एसबीटी जबकि संकलन और आवेदन पैकिंग आवश्यक जार डाउनलोड करेगा।

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

संकलन / पैकेजिंग

एप्लिकेशन की जार फ़ाइल को संकलित करने और पैकेज करने के लिए निम्न कमांड चलाएँ। हमें एप्लिकेशन को चलाने के लिए जार फाइल को स्पार्क कंसोल में जमा करना होगा।

sbt package

स्पार्क के लिए प्रस्तुत करना

काफ्का निर्माता सीएलआई शुरू करें (पिछले अध्याय में समझाया गया है), मेरा पहला-विषय नामक एक नया विषय बनाएं और नीचे दिखाए गए अनुसार कुछ नमूना संदेश प्रदान करें।

Another spark test message

स्पार्क कंसोल को एप्लिकेशन सबमिट करने के लिए निम्न कमांड चलाएँ।

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

इस एप्लिकेशन का नमूना आउटपुट नीचे दिखाया गया है।

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

आइए हम नवीनतम ट्विटर फ़ीड और उसके हैशटैग प्राप्त करने के लिए वास्तविक समय एप्लिकेशन का विश्लेषण करें। इससे पहले हमने काफ्का के साथ स्टॉर्म और स्पार्क का एकीकरण देखा है। दोनों परिदृश्यों में, हमने काफ्का पारिस्थितिकी तंत्र को संदेश भेजने के लिए एक काफ्का निर्माता (सीएलआई का उपयोग करके) बनाया। फिर, तूफ़ान और चिंगारी पूर्णांक में कफ़्का उपभोक्ता का उपयोग करके संदेशों को पढ़ता है और इसे क्रमशः तूफान और स्पार्क पारिस्थितिकी तंत्र में इंजेक्ट करता है। तो, व्यावहारिक रूप से हमें एक काफ्का निर्माता बनाने की आवश्यकता है, जो चाहिए -

  • "ट्विटर स्ट्रीमिंग एपीआई" का उपयोग करके ट्विटर फ़ीड पढ़ें,
  • फ़ीड की प्रक्रिया करें,
  • हैशटैग निकालें और
  • इसे काफ्का को भेजें।

एक बार जब हैशटैग काफ्का, तूफान से प्राप्त कर रहे हैं / स्पार्क एकीकरण सूचनाओं-एनीमेशन प्राप्त करते हैं और तूफान / स्पार्क पारिस्थितिकी तंत्र को भेजें।

ट्विटर स्ट्रीमिंग एपीआई

"ट्विटर स्ट्रीमिंग एपीआई" को किसी भी प्रोग्रामिंग भाषा में एक्सेस किया जा सकता है। "Twitter4j" एक खुला स्रोत है, अनौपचारिक जावा पुस्तकालय, जो आसानी से "ट्विटर स्ट्रीमिंग एपीआई" तक पहुंचने के लिए जावा आधारित मॉड्यूल प्रदान करता है। "Twitter4j" ट्वीट्स को एक्सेस करने के लिए एक श्रोता आधारित रूपरेखा प्रदान करता है। "ट्विटर स्ट्रीमिंग एपीआई" का उपयोग करने के लिए, हमें ट्विटर डेवलपर खाते के लिए साइन इन करना होगा और निम्नलिखित प्राप्त करना चाहिएOAuth प्रमाणीकरण विवरण।

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

डेवलपर अकाउंट बन जाने के बाद, “twitter4j” जार फ़ाइलों को डाउनलोड करें और इसे जावा क्लास पथ में रखें।

पूरा ट्विटर काफ्का निर्माता कोडिंग (KafkaTwitterProducer.java) नीचे सूचीबद्ध है -

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

संकलन

निम्नलिखित कमांड का उपयोग करके एप्लिकेशन को संकलित करें -

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

क्रियान्वयन

दो कंसोल खोलें। ऊपर संकलित एप्लिकेशन को चलाएं जैसा कि नीचे एक कंसोल में दिखाया गया है।

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

स्पार्क / स्टॉर्म के किसी एक अनुप्रयोग को पिछले अध्याय में एक और जीत-दहेज में चलाएं। ध्यान देने वाली मुख्य बात यह है कि उपयोग किया जाने वाला विषय दोनों ही मामलों में समान होना चाहिए। यहाँ, हमने "my-first-topic" विषय नाम के रूप में उपयोग किया है।

उत्पादन

इस एप्लिकेशन का आउटपुट कीवर्ड और ट्विटर के वर्तमान फीड पर निर्भर करेगा। एक नमूना आउटपुट नीचे निर्दिष्ट किया गया है (तूफान एकीकरण)।

. . .
food : 1
foodie : 2
burger : 1
. . .

काफ्का उपकरण “org.apache.kafka.tools। * के तहत पैक किया गया। उपकरण को सिस्टम टूल और प्रतिकृति टूल में वर्गीकृत किया गया है।

तंत्र उपकरण

सिस्टम टूल्स रन क्लास स्क्रिप्ट का उपयोग करके कमांड लाइन से चलाया जा सकता है। वाक्य विन्यास इस प्रकार है -

bin/kafka-run-class.sh package.class - - options

सिस्टम टूल्स में से कुछ नीचे दिए गए हैं -

  • Kafka Migration Tool - इस टूल का इस्तेमाल ब्रोकर को एक वर्जन से दूसरे वर्जन पर माइग्रेट करने के लिए किया जाता है।

  • Mirror Maker - इस उपकरण का उपयोग एक काफ्का क्लस्टर की मिररिंग से दूसरे को प्रदान करने के लिए किया जाता है।

  • Consumer Offset Checker - यह उपकरण विषय और उपभोक्ता समूह के निर्दिष्ट सेट के लिए उपभोक्ता समूह, विषय, विभाजन, ऑफ-सेट, लॉगसाइज़, मालिक प्रदर्शित करता है।

प्रतिकृति उपकरण

काफ्का प्रतिकृति एक उच्च स्तरीय डिजाइन उपकरण है। प्रतिकृति उपकरण जोड़ने का उद्देश्य मजबूत स्थायित्व और उच्च उपलब्धता के लिए है। कुछ प्रतिकृति उपकरण नीचे दिए गए हैं -

  • Create Topic Tool - यह डिफ़ॉल्ट विभाजन, प्रतिकृति कारक के साथ एक विषय बनाता है और प्रतिकृति असाइनमेंट करने के लिए काफ्का की डिफ़ॉल्ट योजना का उपयोग करता है।

  • List Topic Tool- यह उपकरण विषयों की दी गई सूची के लिए सूचना को सूचीबद्ध करता है। यदि कमांड लाइन में कोई विषय प्रदान नहीं किया जाता है, तो टूल सभी प्रश्नों को प्राप्त करने के लिए ज़ुकाइपर से पूछताछ करता है और उनके लिए जानकारी सूचीबद्ध करता है। उपकरण प्रदर्शित करने वाले क्षेत्र विषय का नाम, विभाजन, नेता, प्रतिकृतियां, आईएसआर हैं।

  • Add Partition Tool- किसी विषय का निर्माण, विषय के विभाजन की संख्या को निर्दिष्ट करना होगा। बाद में, विषय के लिए और अधिक विभाजन की आवश्यकता हो सकती है, जब विषय की मात्रा बढ़ जाएगी। यह उपकरण एक विशिष्ट विषय के लिए अधिक विभाजन जोड़ने में मदद करता है और जोड़े गए विभाजन के मैनुअल प्रतिकृति असाइनमेंट की भी अनुमति देता है।

काफ्का आज के कई बेहतरीन औद्योगिक अनुप्रयोगों का समर्थन करता है। हम इस अध्याय में काफ्का के कुछ सबसे उल्लेखनीय अनुप्रयोगों का एक संक्षिप्त विवरण प्रदान करेंगे।

ट्विटर

ट्विटर एक ऑनलाइन सोशल नेटवर्किंग सेवा है जो उपयोगकर्ता के ट्वीट भेजने और प्राप्त करने के लिए एक मंच प्रदान करती है। पंजीकृत उपयोगकर्ता ट्वीट पढ़ और पोस्ट कर सकते हैं, लेकिन अपंजीकृत उपयोगकर्ता केवल ट्वीट पढ़ सकते हैं। ट्विटर स्टॉर्म-काफ्का को उनके स्ट्रीम प्रोसेसिंग इन्फ्रास्ट्रक्चर के एक भाग के रूप में उपयोग करता है।

लिंक्डइन

Apache Kafka का उपयोग लिंक्डइन पर गतिविधि स्ट्रीम डेटा और ऑपरेशनल मेट्रिक्स के लिए किया जाता है। काफ्का मेस-सीजिंग सिस्टम ऑनलाइन संदेश की खपत के लिए लिंक्डइन न्यूज़फ़ीड, लिंक्डइन जैसे विभिन्न उत्पादों के साथ लिंक्डइन में मदद करता है और Hadoop जैसे ऑफ़लाइन विश्लेषिकी प्रणालियों के अलावा। लिंक्डइन के संबंध में काफ्का की मजबूत स्थायित्व भी प्रमुख कारकों में से एक है।

Netflix

नेटफ्लिक्स ऑन-डिमांड इंटरनेट स्ट्रीमिंग मीडिया का एक अमेरिकी बहुराष्ट्रीय प्रदाता है। नेटफ्लिक्स वास्तविक समय की निगरानी और घटना प्रसंस्करण के लिए काफ्का का उपयोग करता है।

mozilla

मोज़िला एक फ्री-सॉफ्टवेयर समुदाय है, जिसे 1998 में नेटस्केप के सदस्यों द्वारा बनाया गया था। काफ़्का जल्द ही टेलीमेट्री, टेस्ट पायलट, आदि जैसी परियोजनाओं के लिए अंतिम-उपयोगकर्ता के ब्राउज़र से प्रदर्शन और उपयोग डेटा एकत्र करने के लिए मोज़िला वर्तमान उत्पादन प्रणाली के एक हिस्से की जगह लेगा।

आकाशवाणी

Oracle अपने एंटरप्राइज़ सर्विस बस उत्पाद से OSB (Oracle Service Bus) नामक काफ़्का को मूल कनेक्टिविटी प्रदान करता है, जो डेवलपर्स को चरणबद्ध डेटा पाइपलाइनों को लागू करने के लिए OSB अंतर्निहित मध्यस्थता क्षमताओं का लाभ उठाने की अनुमति देता है।