अपाचे काफ्का - त्वरित गाइड
बिग डेटा में, डेटा की एक विशाल मात्रा का उपयोग किया जाता है। डेटा के बारे में, हमारे पास दो मुख्य चुनौतियां हैं। पहली चुनौती यह है कि बड़ी मात्रा में डेटा कैसे एकत्र किया जाए और दूसरी चुनौती है एकत्रित डेटा का विश्लेषण करना। उन चुनौतियों से पार पाने के लिए, आपको एक संदेश प्रणाली की आवश्यकता होगी।
कफका वितरित उच्च थ्रूपुट प्रणालियों के लिए डिज़ाइन किया गया है। काफ्का अधिक पारंपरिक संदेश ब्रोकर के प्रतिस्थापन के रूप में बहुत अच्छी तरह से काम करता है। अन्य संदेश प्रणालियों की तुलना में, काफ्का में बेहतर थ्रूपुट, अंतर्निहित विभाजन, प्रतिकृति और निहित दोष-सहिष्णुता है, जो इसे बड़े पैमाने पर संदेश प्रसंस्करण अनुप्रयोगों के लिए एक अच्छा फिट बनाता है।
मैसेजिंग सिस्टम क्या है?
एक संदेश प्रणाली एक अनुप्रयोग से दूसरे में डेटा स्थानांतरित करने के लिए जिम्मेदार है, इसलिए एप्लिकेशन डेटा पर ध्यान केंद्रित कर सकते हैं, लेकिन इसे साझा करने के तरीके के बारे में चिंता न करें। वितरित संदेश विश्वसनीय संदेश कतार की अवधारणा पर आधारित है। संदेश क्लाइंट अनुप्रयोग और संदेश प्रणाली के बीच अतुल्यकालिक रूप से कतारबद्ध हैं। दो प्रकार के मैसेजिंग पैटर्न उपलब्ध हैं - एक है पॉइंट टू पॉइंट और दूसरा है पब्लिश-सब्सक्रिप्शन (पब-सब) मैसेजिंग सिस्टम। ज्यादातर मैसेजिंग पैटर्न फॉलो करते हैंpub-sub।
प्वाइंट टू प्वाइंट मैसेजिंग सिस्टम
एक बिंदु से बिंदु प्रणाली में, संदेश एक कतार में बने रहते हैं। एक या अधिक उपभोक्ता कतार में मौजूद संदेशों का उपभोग कर सकते हैं, लेकिन एक विशेष संदेश का उपभोग अधिकतम एक उपभोक्ता ही कर सकता है। एक बार जब कोई उपभोक्ता कतार में कोई संदेश पढ़ता है, तो वह उस कतार से गायब हो जाता है। इस प्रणाली का विशिष्ट उदाहरण एक ऑर्डर प्रोसेसिंग सिस्टम है, जहां प्रत्येक ऑर्डर को एक ऑर्डर प्रोसेसर द्वारा संसाधित किया जाएगा, लेकिन एकाधिक ऑर्डर प्रोसेसर एक ही समय में भी काम कर सकते हैं। निम्नलिखित चित्र संरचना को दर्शाता है।
प्रकाशन-सदस्यता संदेश प्रणाली
प्रकाशन-सदस्यता प्रणाली में, संदेश एक विषय में बने रहते हैं। पॉइंट-टू-पॉइंट सिस्टम के विपरीत, उपभोक्ता एक या अधिक विषय की सदस्यता ले सकते हैं और उस विषय के सभी संदेशों का उपभोग कर सकते हैं। प्रकाशन-सदस्यता प्रणाली में, संदेश निर्माता को प्रकाशक कहा जाता है और संदेश उपभोक्ताओं को ग्राहक कहा जाता है। एक वास्तविक जीवन का उदाहरण डिश टीवी है, जो विभिन्न चैनलों जैसे खेल, सिनेमा, संगीत आदि को प्रकाशित करता है, और कोई भी अपने स्वयं के चैनल की सदस्यता ले सकता है और जब भी उनके सदस्यता प्राप्त चैनल उपलब्ध हैं, उन्हें प्राप्त कर सकता है।
काफ्का क्या है?
अपाचे काफ्का एक वितरित प्रकाशित-सदस्यता संदेश प्रणाली और एक मजबूत कतार है जो डेटा की एक उच्च मात्रा को संभाल सकती है और आपको एक छोर से दूसरे तक संदेश भेजने में सक्षम बनाती है। काफ्का ऑफ़लाइन और ऑनलाइन संदेश खपत दोनों के लिए उपयुक्त है। कफ़्का संदेशों को डिस्क पर जारी रखा जाता है और डेटा हानि को रोकने के लिए क्लस्टर के भीतर दोहराया जाता है। काफ्का ज़ूकीर सिंक्रोनाइज़ेशन सर्विस के शीर्ष पर बनाया गया है। यह वास्तविक समय के स्ट्रीमिंग डेटा विश्लेषण के लिए अपाचे स्टॉर्म और स्पार्क के साथ बहुत अच्छी तरह से एकीकृत करता है।
लाभ
काफ्का के कुछ लाभ निम्नलिखित हैं -
Reliability - काफ्का वितरित, विभाजन, प्रतिकृति और दोष सहिष्णुता है।
Scalability - काफ्का मैसेजिंग सिस्टम बिना समय के आसानी से तराजू ।।
Durability- काफ्का
डिस्ट्रीब्यूटेड कम लॉग
का उपयोग करता है, जिसका अर्थ है कि संदेश डिस्क पर जितनी जल्दी हो सके, इसलिए टिकाऊ है।Performance- कफ़्का में संदेशों को प्रकाशित करने और सदस्यता देने दोनों के लिए उच्च प्रवाह है। यह स्थिर प्रदर्शन को बनाए रखता है यहां तक कि कई टीबी संदेशों को संग्रहीत किया जाता है।
काफ्का बहुत तेज है और शून्य डाउनटाइम और शून्य डेटा हानि की गारंटी देता है।
बक्सों का इस्तेमाल करें
काफ्का का उपयोग कई उपयोग मामलों में किया जा सकता है। उनमें से कुछ नीचे सूचीबद्ध हैं -
Metrics- कफका अक्सर परिचालन निगरानी डेटा के लिए उपयोग किया जाता है। इसमें परिचालन डेटा के केंद्रीकृत फ़ीड का उत्पादन करने के लिए वितरित अनुप्रयोगों के एकत्रित आंकड़े शामिल हैं।
Log Aggregation Solution - कफ़्का का उपयोग कई सेवाओं से लॉग एकत्र करने और उन्हें मानक प्रारूप में कई कॉन-समर में उपलब्ध कराने के लिए एक संगठन में किया जा सकता है।
Stream Processing- स्टॉर्म और स्पार्क स्ट्रीमिंग जैसे लोकप्रिय ढांचे एक विषय के डेटा को पढ़ते हैं, इसे संसाधित करते हैं, और एक नए विषय पर संसाधित डेटा लिखते हैं जहां यह उपयोगकर्ताओं और अनुप्रयोगों के लिए उपलब्ध हो जाता है। धारा प्रसंस्करण के संदर्भ में काफ्का का मजबूत स्थायित्व भी बहुत उपयोगी है।
कफका की जरूरत है
कफका सभी वास्तविक समय के डेटा फीड को संभालने के लिए एक एकीकृत मंच है। काफ्का कम विलंबता संदेश वितरण का समर्थन करता है और मशीन विफलताओं की उपस्थिति में गलती सहिष्णुता की गारंटी देता है। इसमें बड़ी संख्या में विविध उपभोक्ताओं को संभालने की क्षमता है। काफ़्का बहुत तेज़ है, 2 मिलियन लिखता है / सेकंड। काफ्का डिस्क में सभी डेटा को बनाए रखता है, जिसका अनिवार्य रूप से मतलब है कि सभी राइट्स ओएस (रैम) के पेज कैश पर जाते हैं। यह पेज कैश से नेटवर्क सॉकेट में डेटा स्थानांतरित करने के लिए बहुत कुशल बनाता है।
काफ्का में गहराई से जाने से पहले, आपको मुख्य शब्दावली जैसे विषय, दलाल, निर्माता और उपभोक्ता के बारे में पता होना चाहिए। निम्नलिखित आरेख मुख्य शब्दावली को दिखाता है और तालिका आरेख घटकों का विस्तार से वर्णन करती है।
उपरोक्त आरेख में, एक विषय को तीन विभाजनों में कॉन्फ़िगर किया गया है। विभाजन 1 में दो ऑफसेट कारक 0 और 1. विभाजन 2 में चार ऑफसेट कारक 0, 1, 2 और 3 हैं। विभाजन 3 में एक ऑफसेट कारक 0. है। प्रतिकृति की आईडी उसी सर्वर की आईडी के समान है जो इसे होस्ट करता है।
मान लें, यदि विषय का प्रतिकृति कारक 3 पर सेट है, तो काफ्का प्रत्येक विभाजन के 3 समान प्रतिकृतियां बनाएगा और उन्हें अपने सभी कार्यों के लिए उपलब्ध कराने के लिए क्लस्टर में रखेगा। क्लस्टर में लोड को संतुलित करने के लिए, प्रत्येक ब्रोकर उन विभाजनों में से एक या अधिक को संग्रहीत करता है। एक ही समय में कई निर्माता और उपभोक्ता संदेशों को प्रकाशित और पुनः प्राप्त कर सकते हैं।
S.No | अवयव और विवरण |
---|---|
1 | Topics किसी विशेष श्रेणी से संबंधित संदेशों की एक धारा को एक विषय कहा जाता है। डेटा विषयों में संग्रहीत किया जाता है। विषय विभाजन में विभाजित हैं। प्रत्येक विषय के लिए, काफ्का एक विभाजन का एक मिनी-मम रखता है। इस तरह के प्रत्येक विभाजन में अपरिवर्तनीय क्रम में संदेश होते हैं। एक विभाजन को समान आकारों के खंड फ़ाइलों के सेट के रूप में कार्यान्वित किया जाता है। |
2 | Partition विषय में कई विभाजन हो सकते हैं, इसलिए यह डेटा की एक मनमानी राशि को संभाल सकता है। |
3 | Partition offset प्रत्येक विभाजित संदेश में एक अद्वितीय अनुक्रम आईडी है जिसे |
4 | Replicas of partition प्रतिकृतियां विभाजन के |
5 | Brokers
|
6 | Kafka Cluster काफ्का के एक से अधिक ब्रोकर होने के कारण उसे काफ्का क्लस्टर कहा जाता है। एक काफ्का क्लस्टर को बिना डाउनटाइम के विस्तारित किया जा सकता है। ये क्लस्टर संदेश डेटा की दृढ़ता और प्रतिकृति का प्रबंधन करने के लिए उपयोग किए जाते हैं। |
7 | Producers निर्माता एक या अधिक काफ्का विषयों के संदेशों के प्रकाशक हैं। निर्माता कफका दलालों को डेटा भेजते हैं। जब भी कोई प्रोड्यूसर किसी ब्रोकर को मैसेज देता है, ब्रोकर मैसेज को लास्ट सेगमेंट फाइल में डाल देता है। दरअसल, मैसेज को एक पार्टीशन में जोड़ा जाएगा। निर्माता अपनी पसंद के विभाजन के लिए संदेश भी भेज सकते हैं। |
8 | Consumers उपभोक्ता दलालों से डेटा पढ़ते हैं। उपभोक्ता एक या अधिक विषयों की सदस्यता लेते हैं और दलालों से डेटा खींचकर प्रकाशित संदेशों का उपभोग करते हैं। |
9 | Leader
|
10 | Follower नेता निर्देशों का पालन करने वाले नोड को अनुयायी कहा जाता है। यदि नेता विफल हो जाता है, तो अनुयायी में से एक स्वचालित रूप से नया नेता बन जाएगा। एक अनुयायी सामान्य उपभोक्ता के रूप में कार्य करता है, संदेशों को खींचता है और अपने स्वयं के डेटा स्टोर को अद्यतित करता है। |
निम्नलिखित दृष्टांत पर एक नज़र डालें। यह काफ्का के क्लस्टर आरेख को दर्शाता है।
निम्न तालिका उपरोक्त आरेख में दिखाए गए प्रत्येक घटक का वर्णन करती है।
S.No | अवयव और विवरण |
---|---|
1 | Broker लोड संतुलन बनाए रखने के लिए काफ्का क्लस्टर में आमतौर पर कई ब्रोकर होते हैं। काफ्का दलाल स्टेटलेस हैं, इसलिए वे अपने क्लस्टर राज्य को बनाए रखने के लिए ज़ूकेपर का उपयोग करते हैं। एक काफ्का ब्रोकर उदाहरण सैकड़ों-हजारों रीड्स को संभाल सकता है और प्रति सेकंड लिखता है और प्रत्येक ब्रो-केर प्रदर्शन प्रभाव के बिना संदेशों के टीबी को संभाल सकता है। काफ्का दलाल नेता का चुनाव ज़ूकीपर द्वारा किया जा सकता है। |
2 | ZooKeeper ज़ूकेपर का उपयोग काफ्का दलाल के प्रबंधन और समन्वय के लिए किया जाता है। चिड़ियाघर कीपर सेवा का उपयोग मुख्य रूप से निर्माता और उपभोक्ता को काफ्का प्रणाली में किसी नए दलाल की उपस्थिति या काफ्का प्रणाली में दलाल की विफलता के बारे में सूचित करने के लिए किया जाता है। दलाल की उपस्थिति या विफलता के बारे में ज़ुकेर द्वारा प्राप्त अधिसूचना के अनुसार, प्रो-ड्यूरर और उपभोक्ता निर्णय लेते हैं और कुछ अन्य ब्रोकर के साथ अपने कार्य का समन्वय शुरू करते हैं। |
3 | Producers निर्माता दलालों को डेटा धक्का देते हैं। जब नया ब्रोकर शुरू किया जाता है, तो सभी निर्माता इसे खोजते हैं और स्वचालित रूप से उस नए ब्रोकर को संदेश भेजते हैं। काफ्का निर्माता ब्रोकर से पावती के लिए इंतजार नहीं करता है और जितनी तेजी से ब्रोकर संभाल सकता है, उतने संदेश भेजता है। |
4 | Consumers चूंकि काफ्का दलाल स्टेटलेस हैं, जिसका अर्थ है कि उपभोक्ता को यह सुनिश्चित करना होगा कि विभाजन ऑफसेट का उपयोग करके कितने संदेशों का उपभोग किया गया है। यदि उपभोक्ता एक विशेष संदेश को स्वीकार करता है, तो इसका अर्थ है कि उपभोक्ता ने सभी पूर्व संदेशों का उपभोग किया है। उपभोक्ता ब्रोकर को उपभोग करने के लिए तैयार बाइट्स का एक अतुल्यकालिक पुल अनुरोध जारी करता है। उपभोक्ता एक ऑफसेट मूल्य की आपूर्ति करके विभाजन में किसी भी बिंदु पर रिवाइंड या छोड़ सकते हैं। उपभोक्ता ऑफसेट मूल्य को चिड़ियाघर कीपर द्वारा अधिसूचित किया जाता है। |
अब तक, हमने काफ्का की मुख्य अवधारणाओं पर चर्चा की। आइए अब काफ्का के वर्कफ़्लो पर कुछ प्रकाश डालें।
काफ्का बस एक या अधिक विभाजन में विभाजित विषयों का एक संग्रह है। एक काफ्का विभाजन संदेशों का एक क्रमबद्ध रूप से क्रमबद्ध क्रम है, जहां प्रत्येक संदेश को उनके सूचकांक द्वारा पहचाना जाता है (जिसे ऑफसेट कहा जाता है)। एक काफ्का क्लस्टर में सभी डेटा विभाजन का असंबद्ध संघ है। आने वाले संदेश एक विभाजन के अंत में लिखे गए हैं और संदेश उपभोक्ताओं द्वारा क्रमिक रूप से पढ़े जाते हैं। विभिन्न दलालों को संदेश दोहराकर स्थायित्व प्रदान किया जाता है।
काफ्का तेज, विश्वसनीय, निरंतर, दोष-सहिष्णुता और शून्य डाउनटाइम तरीके से पब-उप और कतार आधारित संदेश प्रणाली प्रदान करता है। दोनों ही मामलों में, निर्माता केवल एक विषय पर संदेश भेजते हैं और उपभोक्ता अपनी आवश्यकता के आधार पर किसी भी प्रकार के संदेश प्रणाली को चुन सकता है। हमें यह समझने के लिए कि उपभोक्ता अपनी पसंद का मैसेजिंग सिस्टम कैसे चुन सकते हैं, अगले भाग में दिए गए चरणों का पालन करें।
पब-सब मैसेजिंग का वर्कफ़्लो
निम्नलिखित पब-सब मैसेजिंग का चरणवार वर्कफ़्लो है -
निर्माता नियमित अंतराल पर एक विषय पर संदेश भेजते हैं।
Kafka ब्रोकर उस विशेष विषय के लिए कॉन्फ़िगर किए गए विभाजनों में सभी संदेशों को संग्रहीत करता है। यह सुनिश्चित करता है कि संदेश विभाजन के बीच समान रूप से साझा किए गए हैं। यदि निर्माता दो संदेश भेजता है और दो विभाजन होते हैं, तो काफ्का पहले विभाजन में एक संदेश और दूसरे विभाजन में दूसरा संदेश संग्रहीत करेगा।
उपभोक्ता एक विशिष्ट विषय की सदस्यता लेता है।
एक बार जब उपभोक्ता किसी विषय की सदस्यता ले लेता है, तो काफ्का उपभोक्ता को विषय की वर्तमान ऑफसेट प्रदान करेगा और ज़ुकाइपर पहनावा में ऑफसेट को भी बचाता है।
उपभोक्ता नए संदेशों के लिए एक नियमित अंतराल (जैसे 100 एमएस) में काफ्का का अनुरोध करेगा।
एक बार काफ्का उत्पादकों से संदेश प्राप्त करता है, यह इन संदेशों को उपभोक्ताओं को अग्रेषित करता है।
उपभोक्ता संदेश प्राप्त करेगा और इसे संसाधित करेगा।
संदेशों के संसाधित होने के बाद, उपभोक्ता काफ्का दलाल को एक पावती भेजेगा।
एक बार जब काफ्का को एक पावती मिल जाती है, तो वह ऑफसेट को नए मूल्य में बदल देता है और इसे ज़ूकीपर में अपडेट करता है। चूंकि ज़ुकाइटर में ऑफ़सेट बनाए जाते हैं, इसलिए उपभोक्ता सर्वर के दौरान भी अगले संदेश को सही ढंग से पढ़ सकता है।
यह उपरोक्त प्रवाह तब तक दोहराएगा जब तक कि उपभोक्ता अनुरोध को रोक नहीं देता है।
उपभोक्ता के पास किसी भी समय किसी विषय की वांछित ऑफ़र्स को रिवाइंड / स्किप करने का विकल्प होता है और बाद के सभी संदेशों को पढ़ सकता है।
कतार मैसेजिंग / उपभोक्ता समूह का वर्कफ़्लो
एक एकल उपभोक्ता के बजाय एक कतार संदेश प्रणाली में, एक ही समूह ID
वाले उपभोक्ताओं का समूह
किसी विषय की सदस्यता लेगा। सरल शब्दों में, एक ही ग्रुप आईडी वाले
किसी विषय की सदस्यता लेने वाले उपभोक्ताओं को एक ही समूह
माना जाता है और उनके बीच संदेश साझा किए जाते हैं। आइए हम इस प्रणाली के वास्तविक वर्कफ़्लो की जाँच करें।
निर्माता एक विषय के लिए एक नियमित अंतराल में संदेश भेजते हैं।
कफ़्का उस विशेष विषय के लिए पहले के परिदृश्य के समान कॉन्फ़िगर किए गए विभाजनों में सभी संदेशों को संग्रहीत करता है।
एक एकल उपभोक्ता एक विशिष्ट विषय की सदस्यता लेता है,
समूह -1 के
रूप मेंग्रुप
आईडी के
साथटॉपिक -01
ग्रहण करता है ।कफ़्का उपभोक्ता के साथ उसी तरह से बातचीत करता है जैसे पब-सब मैसेजिंग जब तक नया उपभोक्ता एक ही विषय,
टॉपिक -01
उसीग्रुप आईडी के
साथग्रुप -1 के
रूप में सदस्यता नहीं लेता है ।एक बार जब नया उपभोक्ता आता है, तो काफ्का अपने ऑपरेशन को साझा करने के लिए स्विच करता है और दोनों उपभोक्ताओं के बीच डेटा साझा करता है। यह बंटवारा तब तक चलेगा जब तक कि उस विशेष विषय के लिए कॉन्फ़िगर किए गए विभाजन की संख्या तक पहुँचने वाले की संख्या नहीं हो जाती।
एक बार जब उपभोक्ता की संख्या विभाजन से अधिक हो जाती है, तो नया उपभोक्ता तब तक कोई और संदेश प्राप्त नहीं करेगा, जब तक कि कोई भी मौजूदा उपभोक्ता अनिर्णायक न हो। यह परिदृश्य उत्पन्न होता है क्योंकि काफ्का में प्रत्येक उपभोक्ता को एक विभाजन का एक न्यूनतम सौंपा जाएगा और एक बार सभी विभाजन मौजूदा उपभोक्ताओं को सौंपे जाने के बाद, नए उपभोक्ताओं को इंतजार करना होगा।
इस सुविधा को
उपभोक्ता समूह
भी कहा जाता है । उसी तरह, काफ्का दोनों प्रणालियों को बहुत सरल और कुशल तरीके से प्रदान करेगा।
चिड़ियाघर कीपर की भूमिका
Apache Kafka की एक महत्वपूर्ण निर्भरता Apache Zookeeper है, जो एक वितरित कॉन्फ़िगरेशन और सिंक्रनाइज़ेशन सेवा है। ज़ूकीफ़र काफ्का दलालों और उपभोक्ताओं के बीच समन्वय इंटरफ़ेस के रूप में कार्य करता है। काफ्का सर्वर एक ज़ुकीपर क्लस्टर के माध्यम से जानकारी साझा करता है। कफ़का ज़ूकीपर में बुनियादी मेटाडेटा संग्रहीत करता है जैसे कि विषयों, दलालों, उपभोक्ता ऑफसेट (कतार पाठकों) के बारे में जानकारी और इतने पर।
चूंकि सभी महत्वपूर्ण जानकारी ज़ुकाइपर में संग्रहीत होती है और यह सामान्य रूप से इस डेटा को अपने पहनावा में भरती है, काफ्का ब्रोकर / ज़ूकीपर की विफलता काफ्का क्लस्टर की स्थिति को प्रभावित नहीं करती है। एक बार ज़ूकीर के पुनरारंभ होने पर, काफ्का राज्य को बहाल करेगा। यह काफ्का के लिए शून्य डाउनटाइम देता है। कफ़्का दलाल के बीच नेता का चुनाव नेता की विफलता की स्थिति में ज़ुकीपर का उपयोग करके भी किया जाता है।
ज़ुकीपर के बारे में अधिक जानने के लिए, ज़ूकीपर को देखें
हमें अगले अध्याय में अपनी मशीन पर जावा, ज़ूकिपर, और काफ्का स्थापित करने के तरीके पर आगे जारी रखना चाहिए।
अपनी मशीन पर जावा को स्थापित करने के चरण निम्नलिखित हैं।
चरण 1 - जावा स्थापना का सत्यापन
उम्मीद है कि आपने अभी-अभी अपनी मशीन पर जावा को स्थापित किया है, इसलिए आप केवल निम्न कमांड का उपयोग करके इसे सत्यापित करते हैं।
$ java -version
यदि जावा आपकी मशीन पर सफलतापूर्वक स्थापित है, तो आप स्थापित जावा का संस्करण देख सकते हैं।
चरण 1.1 - JDK डाउनलोड करें
यदि जावा डाउनलोड नहीं किया गया है, तो कृपया निम्न लिंक पर जाकर JDK का नवीनतम संस्करण डाउनलोड करें और नवीनतम संस्करण डाउनलोड करें।
http://www.oracle.com/technetwork/java/javase/downloads/index.htmlअब नवीनतम संस्करण JDK 8u 60 है और फ़ाइल "jdk-8u60-linux-x64.tar.gz" है। कृपया अपनी मशीन पर फ़ाइल डाउनलोड करें।
चरण 1.2 - फ़ाइलें निकालें
आम तौर पर, डाउनलोड की जा रही फ़ाइलों को डाउनलोड फ़ोल्डर में संग्रहीत किया जाता है, इसे सत्यापित करें और निम्न आदेशों का उपयोग करके टार सेटअप निकालें।
$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz
चरण 1.3 - ऑप्ट निर्देशिका में ले जाएँ
जावा को सभी उपयोगकर्ताओं के लिए उपलब्ध कराने के लिए, निकाले गए जावा सामग्री को usr / स्थानीय / जावा
/ फ़ोल्डर में ले जाएँ।
$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/
चरण १.४ - पथ निर्धारित करें
पथ और JAVA_HOME चर सेट करने के लिए, ~ / .bashrc फ़ाइल में निम्न कमांड जोड़ें।
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
अब सभी परिवर्तनों को वर्तमान चल रही प्रणाली में लागू करें।
$ source ~/.bashrc
चरण 1.5 - जावा अल्टरनेटिव
जावा अल्टरनेटिव्स को बदलने के लिए निम्न कमांड का उपयोग करें।
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
Step 1.6 - अब चरण 1 में बताई गई सत्यापन कमांड (जावा-वर्सन) का उपयोग करके जावा को सत्यापित करें।
चरण 2 - चिड़ियाघर कीपर फ्रेमवर्क स्थापना
चरण 2.1 - चिड़ियाघरकीपर डाउनलोड करें
अपनी मशीन पर चिड़ियाघरकीपर फ्रेमवर्क स्थापित करने के लिए, निम्न लिंक पर जाएं और चिड़ियाघरकीपर का नवीनतम संस्करण डाउनलोड करें।
http://zookeeper.apache.org/releases.htmlअब तक, ज़ूकीपर का नवीनतम संस्करण 3.4.6 (ज़ूकीपर-3.4.6.tar.gz) है।
चरण 2.2 - टार फ़ाइल निकालें
निम्नलिखित कमांड का उपयोग करके टार फाइल को निकालें
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data
चरण 2.3 - कॉन्फ़िगरेशन फ़ाइल बनाएँ
Open कॉन्फ़िगरेशन फ़ाइल जिसका नाम vi / zoo.cfg है
, कमांड vi "conf / zoo.cfg" और शुरुआती बिंदु के रूप में सेट करने के लिए निम्नलिखित सभी मापदंडों का उपयोग करता है।
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
एक बार कॉन्फ़िगरेशन फ़ाइल सफलतापूर्वक सहेज ली गई है और फिर से टर्मिनल पर वापस आ जाती है, तो आप ज़ूकीपर सर्वर शुरू कर सकते हैं।
चरण 2.4 - ज़ूकीपर सर्वर प्रारंभ करें
$ bin/zkServer.sh start
इस कमांड को निष्पादित करने के बाद, आपको नीचे दी गई प्रतिक्रिया मिल जाएगी -
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED
चरण 2.5 - सीएलआई शुरू करें
$ bin/zkCli.sh
उपरोक्त कमांड टाइप करने के बाद, आप ज़ूकीपर सर्वर से जुड़ जाएंगे और नीचे की प्रतिक्रिया प्राप्त करेंगे।
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
चरण 2.6 - ज़ूकीपर सर्वर बंद करो
सर्वर से कनेक्ट करने और सभी ऑपरेशन करने के बाद, आप zookeeper सर्वर को निम्न कमांड के साथ रोक सकते हैं -
$ bin/zkServer.sh stop
अब आपने अपनी मशीन पर Java और ZooKeeper सफलतापूर्वक स्थापित कर लिया है। आइये देखते हैं अपाचे काफ्का को स्थापित करने के चरण।
चरण 3 - अपाचे काफ्का स्थापना
हमें अपनी मशीन पर काफ्का स्थापित करने के लिए निम्नलिखित चरणों के साथ जारी रखें।
चरण 3.1 - काफ्का डाउनलोड करें
अपनी मशीन पर काफ्का स्थापित करने के लिए, नीचे दिए गए लिंक पर क्लिक करें -
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgzअब नवीनतम संस्करण है, - kafka_2.11_0.9.0.0.tgz आपकी मशीन पर डाउनलोड किया जाएगा।
चरण 3.2 - टार फ़ाइल को निकालें
निम्नलिखित कमांड का उपयोग करके टार फाइल को निकालें -
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0
अब आपने अपनी मशीन पर काफ्का का नवीनतम संस्करण डाउनलोड किया है।
चरण 3.3 - सर्वर प्रारंभ करें
आप निम्नलिखित कमांड देकर सर्वर शुरू कर सकते हैं -
$ bin/kafka-server-start.sh config/server.properties
सर्वर शुरू होने के बाद, आप अपनी स्क्रीन पर नीचे की प्रतिक्रिया देखेंगे -
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….
चरण 4 - सर्वर बंद करो
सभी ऑपरेशन करने के बाद, आप निम्न कमांड का उपयोग करके सर्वर को बंद कर सकते हैं -
$ bin/kafka-server-stop.sh config/server.properties
अब जब हम पहले ही काफ्का स्थापना पर चर्चा कर चुके हैं, तो हम अगले अध्याय में काफ्का पर बुनियादी संचालन करने का तरीका सीख सकते हैं।
पहले हमें सिंगल नोड-सिंगल ब्रोकर
कॉन्फ़िगरेशन लागू करना शुरू करते हैं और फिर हम अपने सेटअप को सिंगल नोड-मल्टीपल ब्रोकर्स कॉन्फ़िगरेशन में माइग्रेट करेंगे।
उम्मीद है कि आपने अब तक अपनी मशीन पर जावा, ज़ूकीपर और काफ्का स्थापित किया होगा। काफ्का क्लस्टर सेटअप में जाने से पहले, आपको अपना चिड़ियाघर कीपर शुरू करना होगा क्योंकि काफ्का क्लस्टर ज़ूकीपर का उपयोग करता है।
ZooKeeper प्रारंभ करें
एक नया टर्मिनल खोलें और निम्न कमांड टाइप करें -
bin/zookeeper-server-start.sh config/zookeeper.properties
काफ्का ब्रोकर शुरू करने के लिए, निम्न कमांड टाइप करें -
bin/kafka-server-start.sh config/server.properties
काफ्का ब्रोकर शुरू करने के बाद, ज़ूकेपर टर्मिनल पर कमांड जेपी
टाइप करें और आपको निम्नलिखित प्रतिक्रिया दिखाई देगी -
821 QuorumPeerMain
928 Kafka
931 Jps
अब आप टर्मिनल पर चलने वाले दो डेमन को देख सकते हैं जहाँ क्वोरमपेरमैन ज़ूकेर डेमन है और दूसरा काफ्का डेमन है।
सिंगल नोड-सिंगल ब्रोकर कॉन्फ़िगरेशन
इस कॉन्फ़िगरेशन में आपके पास एक एकल ज़ूकीपर और ब्रोकर आईडी उदाहरण है। इसे कॉन्फ़िगर करने के चरण निम्नलिखित हैं -
Creating a Kafka Topic- Kafka सर्वर पर विषय बनाने के लिए kafka-topics.sh
नाम की एक कमांड लाइन उपयोगिता प्रदान करता है । नया टर्मिनल खोलें और नीचे का उदाहरण लिखें।
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
हमने केवल हैलो-काफ्का
नामक एक एकल विभाजन और एक प्रतिकृति कारक के साथ एक विषय बनाया । उपरोक्त निर्मित आउटपुट निम्न आउटपुट के समान होगा -
Output- हैलो-काफ्का
विषय बनाया
विषय बन जाने के बाद, आप काफ्का ब्रोकर टर्मिनल विंडो में नोटिफिकेशन प्राप्त कर सकते हैं और “/ tmp / kafka-लॉग /” में निर्दिष्ट विषय के लिए लॉग इन कर सकते हैं।
विषयों की सूची
काफ्का सर्वर में विषयों की एक सूची प्राप्त करने के लिए, आप निम्नलिखित कमांड का उपयोग कर सकते हैं -
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
चूंकि हमने एक विषय बनाया है, यह केवल हैलो-काफ्का
को सूचीबद्ध करेगा । मान लीजिए, यदि आप एक से अधिक विषय बनाते हैं, तो आपको विषय के नाम आउटपुट में मिलेंगे।
संदेश भेजने के लिए निर्माता शुरू करें
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
उपरोक्त वाक्य रचना से, निर्माता कमांड लाइन क्लाइंट के लिए दो मुख्य मापदंडों की आवश्यकता होती है -
Broker-list- दलालों की सूची जिसे हम संदेश भेजना चाहते हैं। इस मामले में हमारे पास केवल एक दलाल है। Config / server.properties फ़ाइल में ब्रोकर पोर्ट आईडी है, क्योंकि हम जानते हैं कि हमारा ब्रोकर पोर्ट 9092 पर सुन रहा है, इसलिए आप इसे सीधे निर्दिष्ट कर सकते हैं।
विषय नाम - यहाँ विषय के नाम के लिए एक उदाहरण है।
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
निर्माता स्टड से इनपुट पर इंतजार करेगा और काफ्का क्लस्टर में प्रकाशित करेगा। डिफ़ॉल्ट रूप से, प्रत्येक नई पंक्ति को एक नए संदेश के रूप में प्रकाशित किया जाता है, फिर डिफ़ॉल्ट निर्माता गुण कॉन्फ़िगर / निर्माता
में निर्दिष्ट होते हैं । अब आप टर्मिनल में कुछ संदेश टाइप कर सकते हैं जैसा कि नीचे दिखाया गया है।
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
संदेश प्राप्त करने के लिए उपभोक्ता शुरू करें
निर्माता के समान, डिफ़ॉल्ट उपभोक्ता गुण config / Consumer.proper-संबंधों
फ़ाइल में निर्दिष्ट हैं । एक नया टर्मिनल खोलें और संदेशों का उपभोग करने के लिए नीचे दिए गए सिंटैक्स टाइप करें।
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
अंत में, आप निर्माता के टर्मिनल से संदेश दर्ज कर सकते हैं और उन्हें उपभोक्ता के टर्मिनल में प्रदर्शित होते हुए देख सकते हैं। अब तक, आपके पास एकल ब्रोकर के साथ एकल नोड क्लस्टर पर बहुत अच्छी समझ है। आइए अब हम कई दलालों के कॉन्फ़िगरेशन पर चलते हैं।
सिंगल नोड-मल्टीपल ब्रोकर्स कॉन्फ़िगरेशन
कई ब्रोकर्स क्लस्टर सेटअप पर जाने से पहले, पहले अपना चिड़ियाघरकीपर सर्वर शुरू करें।
Create Multiple Kafka Brokers- हमारे पास पहले से ही con-fig / server.properties में एक काफ्का ब्रोकर उदाहरण है। अब हमें कई ब्रोकर इंस्टेंसेस की आवश्यकता है, इसलिए मौजूदा server.prop-erties फाइल को दो नई कॉन्फिग फाइल में कॉपी करें और इसे server-one.properties और server-two.prop-erties नाम दें। फिर दोनों नई फ़ाइलों को संपादित करें और निम्नलिखित परिवर्तन असाइन करें -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers- तीन सर्वरों पर सभी परिवर्तन किए जाने के बाद, प्रत्येक ब्रोकर को एक-एक करके शुरू करने के लिए तीन नए टर्मिनल खोलें।
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
अब हमारे पास मशीन पर चलने वाले तीन अलग-अलग दलाल हैं। टाइप करके सभी डेमों की जांच करने के लिए अपने आप से प्रयास करेंjps चिड़ियाघरकीपर टर्मिनल पर, फिर आपको प्रतिक्रिया दिखाई देगी।
एक विषय बनाना
आइए हम इस विषय के लिए तीन के रूप में प्रतिकृति कारक मान प्रदान करें क्योंकि हमारे पास तीन अलग-अलग दलाल चल रहे हैं। यदि आपके पास दो दलाल हैं, तो निर्दिष्ट प्रतिकृति मान दो होगा।
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
वर्णन करें
आदेश जो दलाल वर्तमान बनाई विषय पर सुन रहा है जैसा कि नीचे दिखाया जाँच करने के लिए प्रयोग किया जाता है -
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
उपरोक्त आउटपुट से, हम यह निष्कर्ष निकाल सकते हैं कि पहली पंक्ति में सभी विभाजनों का सारांश दिया गया है, जो विषय का नाम, विभाजन गणना और प्रतिकृति कारक है जो हमने पहले ही चुना है। दूसरी पंक्ति में, प्रत्येक नोड विभाजन के एक बेतरतीब ढंग से चयनित भाग के लिए नेता होगा।
हमारे मामले में, हम देखते हैं कि हमारा पहला ब्रोकर (दलाल 0 के साथ) नेता है। तब प्रतिकृतियां: 0,2,1 का मतलब है कि सभी दलालों विषय को दोहराने अंत में Isr
का सेट है में-सिंक
प्रतिकृतियां। खैर, यह प्रतिकृतियों का सबसेट है जो वर्तमान में जीवित हैं और नेता द्वारा पकड़ा जाता है।
संदेश भेजने के लिए निर्माता शुरू करें
यह प्रक्रिया एकल ब्रोकर सेटअप की तरह ही रहती है।
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
संदेश प्राप्त करने के लिए उपभोक्ता शुरू करें
यह प्रक्रिया वैसी ही रहती है जैसी सिंगल ब्रोकर सेटअप में दिखाई जाती है।
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
मूल विषय संचालन
इस अध्याय में हम विभिन्न बुनियादी विषय संचालन पर चर्चा करेंगे।
एक विषय को संशोधित करना
जैसा कि आप पहले ही समझ गए हैं कि काफ्का क्लस्टर में एक विषय कैसे बनाया जाए। अब हम निम्न कमांड का उपयोग करके एक निर्मित विषय को संशोधित करते हैं
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
एक विषय हटाना
किसी विषय को हटाने के लिए, आप निम्न सिंटैक्स का उपयोग कर सकते हैं।
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −इसका कोई प्रभाव नहीं पड़ेगा अगर delete.topic.enable सत्य पर सेट नहीं है
जावा क्लाइंट का उपयोग करके संदेशों को प्रकाशित करने और उपभोग करने के लिए एक एप्लिकेशन बनाएं। कफका उत्पादक ग्राहक में निम्नलिखित एपीआई के होते हैं।
काफ्काप्रोड्यूसर एपीआई
आइए इस खंड में काफ्का निर्माता एपीआई के सबसे महत्वपूर्ण सेट को समझते हैं। KafkaProducer API का मध्य भाग KafkaProducer
वर्ग है। KafkaProducer वर्ग निम्न विधियों के साथ अपने निर्माता में एक Kafka दलाल को जोड़ने का विकल्प प्रदान करता है।
KafkaProducer वर्ग किसी विषय पर अतुल्यकालिक संदेश भेजने के लिए विधि भेजता है। भेजें () का हस्ताक्षर इस प्रकार है
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
ProducerRecord - निर्माता भेजे जाने के लिए रिकॉर्ड के एक बफर का प्रबंधन करता है।
Callback - जब सर्वर द्वारा रिकॉर्ड को स्वीकार कर लिया गया है तो सर्वर निष्पादित करने के लिए उपयोगकर्ता को कॉलबैक निष्पादित करता है (अशक्त कोई कॉलबैक इंगित करता है)।
KafkaProducer वर्ग वास्तव में पूरा हो गया है पहले से भेजे गए संदेशों को सुनिश्चित करने के लिए एक फ्लश विधि प्रदान करता है। फ्लश विधि का सिंटैक्स इस प्रकार है -
public void flush()
काफ्काप्रोड्यूसर वर्ग पार्टीशन विधि प्रदान करता है, जो किसी दिए गए विषय के लिए विभाजन मेटाडेटा प्राप्त करने में मदद करता है। यह कस्टम विभाजन के लिए उपयोग किया जा सकता है। इस विधि का हस्ताक्षर इस प्रकार है -
public Map metrics()
यह निर्माता द्वारा बनाए गए आंतरिक मैट्रिक्स का नक्शा लौटाता है।
सार्वजनिक शून्य बंद () - KafkaProducer वर्ग सभी पहले भेजे गए अनुरोधों को पूरा होने तक करीब विधि ब्लॉक प्रदान करता है।
निर्माता एपीआई
निर्माता एपीआई का मध्य भाग निर्माता
वर्ग है। निर्माता वर्ग निम्नलिखित तरीकों से अपने निर्माता में काफ्का दलाल को जोड़ने का विकल्प प्रदान करता है।
निर्माता वर्ग
निर्माता वर्ग को भेजने की विधि प्रदान करता है send निम्नलिखित हस्ताक्षर का उपयोग करके या तो एकल या कई विषयों के लिए संदेश।
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
निर्माता दो प्रकार के होते हैं - Sync तथा Async।
समान API कॉन्फ़िगरेशन सिंक
निर्माता पर भी लागू होता है । उनके बीच का अंतर एक सिंक निर्माता है जो सीधे संदेश भेजता है, लेकिन पृष्ठभूमि में संदेश भेजता है। जब आप उच्च थ्रूपुट चाहते हैं, तो Async निर्माता को प्राथमिकता दी जाती है। पिछली रिलीज में 0.8 की तरह, एक async निर्माता के पास त्रुटि हैंडलर को पंजीकृत करने के लिए भेजने () के लिए कॉलबैक नहीं है। यह केवल वर्तमान रिलीज़ में 0.9 में उपलब्ध है।
सार्वजनिक शून्य के करीब ()
निर्माता वर्ग प्रदान करता है close सभी काफ्का ब्रो-केर्स के लिए निर्माता पूल कनेक्शन बंद करने की विधि।
कॉन्फ़िगरेशन सेटिंग्स
प्रोड्यूसर एपीआई की मुख्य कॉन्फ़िगरेशन सेटिंग्स बेहतर अंडर-स्टैंडिंग के लिए निम्न तालिका में सूचीबद्ध हैं -
S.No | कॉन्फ़िगरेशन सेटिंग्स और विवरण |
---|---|
1 | client.id निर्माता आवेदन की पहचान करता है |
2 | producer.type या तो सिंक या async |
3 | acks एक्स कॉन्फिग्यूशन निर्माता के अनुरोधों के तहत मानदंड को नियंत्रित करता है जो पूर्ण रूप से को-साइडेड हैं। |
4 | retries यदि निर्माता अनुरोध विफल हो जाता है, तो स्वचालित रूप से विशिष्ट मूल्य के साथ पुन: प्रयास करें। |
5 | bootstrap.servers दलालों की बूटस्ट्रैपिंग सूची। |
6 | linger.ms यदि आप अनुरोधों की संख्या को कम करना चाहते हैं तो आप linger.ms को कुछ मूल्य से अधिक के लिए सेट कर सकते हैं। |
7 | key.serializer सीरियल इंटरफ़ेस के लिए कुंजी। |
8 | value.serializer धारावाहिक इंटरफ़ेस के लिए मूल्य। |
9 | batch.size बफर आकार। |
10 | buffer.memory बफ़रिंग के लिए निर्माता को उपलब्ध स्मृति की कुल मात्रा को नियंत्रित करता है। |
निर्माता। एपीआई
ProducerRecord एक कुंजी / मान युग्म है, जो निम्न हस्ताक्षर का उपयोग करके विभाजन, कुंजी और मूल्य जोड़े के साथ एक रिकॉर्ड बनाने के लिए Kafka क्लस्टर में भेजा जाता है ।roducerRecord वर्ग निर्माता।
public ProducerRecord (string topic, int partition, k key, v value)
Topic - उपयोगकर्ता परिभाषित विषय नाम जो रिकॉर्ड करने के लिए संलग्न होगा।
Partition - विभाजन की गिनती
Key - कुंजी जो रिकॉर्ड में शामिल होगी।
- Value - रिकॉर्ड सामग्री
public ProducerRecord (string topic, k key, v value)
ProducerRecord क्लास कंस्ट्रक्टर का उपयोग कुंजी, मूल्य जोड़े और विभाजन के बिना रिकॉर्ड बनाने के लिए किया जाता है।
Topic - रिकॉर्ड असाइन करने के लिए एक विषय बनाएं।
Key - रिकॉर्ड के लिए कुंजी।
Value - रिकॉर्ड सामग्री।
public ProducerRecord (string topic, v value)
ProducerRecord वर्ग विभाजन और कुंजी के बिना एक रिकॉर्ड बनाता है।
Topic - एक विषय बनाएँ।
Value - रिकॉर्ड सामग्री।
ProducerRecord वर्ग विधियों को निम्न तालिका में सूचीबद्ध किया गया है -
S.No | क्लास के तरीके और विवरण |
---|---|
1 | public string topic() विषय रिकॉर्ड के लिए अपील करेंगे। |
2 | public K key() कुंजी जिसे रिकॉर्ड में शामिल किया जाएगा। यदि ऐसी कोई कुंजी नहीं है, तो नल को फिर से चालू कर दिया जाएगा। |
3 | public V value() सामग्री रिकॉर्ड करें। |
4 | partition() रिकॉर्ड के लिए विभाजन की गणना |
SimpleProducer आवेदन
एप्लिकेशन बनाने से पहले, सबसे पहले ZooKeeper और Kafka ब्रोकर को प्रारंभ करें और फिर create कमांड कमांड का उपयोग करके काफ्का ब्रोकर में अपना विषय बनाएं। उसके बाद Sim-pleProducer.java
नाम से एक java class बनाएँ
और निम्नलिखित कोडिंग में टाइप करें।
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation - एप्लिकेशन को निम्न कमांड का उपयोग करके संकलित किया जा सकता है।
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution - एप्लिकेशन को निम्न कमांड का उपयोग करके निष्पादित किया जा सकता है।
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
सरल उपभोक्ता उदाहरण
अब तक हमने काफ्का क्लस्टर को संदेश भेजने के लिए एक निर्माता बनाया है। अब कफका क्लस्टर के रूप में संदेशों का उपभोग करने के लिए एक उपभोक्ता बनाते हैं। KafkaConsumer API का उपयोग काफ्का क्लस्टर के संदेशों का उपभोग करने के लिए किया जाता है। KafkaConsumer वर्ग निर्माता नीचे परिभाषित किया गया है।
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - कंज्यूमर कॉन्फिग का मैप वापस करें।
KafkaConsumer वर्ग में निम्नलिखित महत्वपूर्ण विधियाँ हैं जो नीचे दी गई तालिका में सूचीबद्ध हैं।
S.No | विधि और विवरण |
---|---|
1 | public java.util.Set<TopicPar-tition> assignment() वर्तमान में con-sumer द्वारा निर्दिष्ट विभाजन का सेट प्राप्त करें। |
2 | public string subscription() डायनामिक रूप से हस्ताक्षरित विभाजन प्राप्त करने के लिए विषयों की दी गई सूची की सदस्यता लें। |
3 | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) डायनामिक रूप से हस्ताक्षरित विभाजन प्राप्त करने के लिए विषयों की दी गई सूची की सदस्यता लें। |
4 | public void unsubscribe() विभाजन की दी गई सूची से विषयों को अनसब्सक्राइब करें। |
5 | public void sub-scribe(java.util.List<java.lang.String> topics) डायनामिक रूप से हस्ताक्षरित विभाजन प्राप्त करने के लिए विषयों की दी गई सूची की सदस्यता लें। यदि विषयों की दी गई सूची खाली है, तो इसे सदस्यता समाप्त () के समान माना जाता है। |
6 | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) तर्क पैटर्न नियमित अभिव्यक्ति के प्रारूप में सदस्यता पैटर्न को संदर्भित करता है और श्रोता तर्क को सदस्यता पैटर्न से सूचनाएं मिलती हैं। |
7 | public void as-sign(java.util.List<TopicParti-tion> partitions) मैन्युअल रूप से ग्राहक को विभाजन की एक सूची सौंपें। |
8 | poll() सब्स्क्राइब / असाइन किए गए API में से किसी एक का उपयोग करके निर्दिष्ट किए गए विषयों या विभाजनों के लिए डेटा प्राप्त करें। यदि डेटा के लिए मतदान से पहले विषयों की सदस्यता नहीं ली जाती है, तो यह त्रुटि लौटाएगा। |
9 | public void commitSync() विषयों और विभाजनों की सभी उप-स्क्राइब सूची के लिए पिछले चुनाव () पर कमेटी ऑफसेट लौटी। एक ही ऑपरेशन को कमिशन () के लिए लागू किया जाता है। |
10 | public void seek(TopicPartition partition, long offset) वर्तमान ऑफसेट मूल्य प्राप्त करें जो उपभोक्ता अगले पोल () विधि पर उपयोग करेगा। |
1 1 | public void resume() रुके हुए विभाजन फिर से शुरू करें। |
12 | public void wakeup() उपभोक्ता को जगाएं। |
ConsumerRecord API
ConsumerRecord API का उपयोग काफ्का क्लस्टर से रिकॉर्ड प्राप्त करने के लिए किया जाता है। इस एपीआई में एक विषय का नाम, विभाजन संख्या, जिसमें से रिकॉर्ड प्राप्त किया जा रहा है और एक ऑफसेट जो कि काफ्का विभाजन में रिकॉर्ड को इंगित करता है। ConsumerRecord वर्ग का उपयोग विशिष्ट विषय नाम, विभाजन गणना और <कुंजी, मान> जोड़े के साथ एक उपभोक्ता रिकॉर्ड बनाने के लिए किया जाता है। इसके निम्नलिखित हस्ताक्षर हैं।
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic - काफ्का क्लस्टर से प्राप्त उपभोक्ता रिकॉर्ड का विषय नाम।
Partition - विषय के लिए विभाजन।
Key - रिकॉर्ड की कुंजी, यदि कोई कुंजी मौजूद नहीं है तो शून्य वापस कर दी जाएगी।
Value - रिकॉर्ड सामग्री।
ConsumerRecords API
ConsumerRecords API ConsumerRecord के लिए एक कंटेनर के रूप में कार्य करता है। इस API का उपयोग किसी विशेष विषय के लिए प्रति विभाजन ConsumerRecord की सूची को रखने के लिए किया जाता है। इसके कंस्ट्रक्टर को नीचे परिभाषित किया गया है।
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
TopicPartition - किसी विशेष विषय के लिए विभाजन का नक्शा लौटाएं।
Records - ConsumerRecord की सूची लौटाएं।
ConsumerRecords वर्ग में निम्नलिखित तरीके परिभाषित हैं।
S.No | तरीके और विवरण |
---|---|
1 | public int count() सभी विषयों के लिए रिकॉर्ड की संख्या। |
2 | public Set partitions() इस रिकॉर्ड सेट में डेटा के साथ विभाजन का सेट (यदि कोई डेटा वापस नहीं किया गया था तो सेट खाली है)। |
3 | public Iterator iterator() Iterator आपको एक संग्रह के माध्यम से चक्र प्राप्त करने, तत्वों को प्राप्त करने या फिर से स्थानांतरित करने में सक्षम बनाता है। |
4 | public List records() दिए गए विभाजन के रिकॉर्ड की सूची प्राप्त करें। |
कॉन्फ़िगरेशन सेटिंग्स
उपभोक्ता क्लाइंट API मुख्य कॉन्फ़िगरेशन सेटिंग्स के लिए कॉन्फ़िगरेशन सेटिंग्स नीचे सूचीबद्ध हैं -
S.No | सेटिंग्स और विवरण |
---|---|
1 | bootstrap.servers दलालों की बूटस्ट्रैपिंग सूची। |
2 | group.id एक समूह के लिए एक व्यक्तिगत उपभोक्ता असाइन करता है। |
3 | enable.auto.commit यदि मान सत्य है, तो ऑफ़सेट के लिए ऑटो कमिट सक्षम करें, अन्यथा प्रतिबद्ध नहीं है। |
4 | auto.commit.interval.ms वापसी कितनी बार उपभोग किए गए ऑफ़सेट्स ज़ूकीपर को लिखे गए हैं। |
5 | session.timeout.ms इंगित करता है कि संदेश देने और जारी रखने के लिए कितने मिलीसेकंड काफ्का ज़ूकेपर के अनुरोध का जवाब देने (पढ़ने या लिखने) का इंतजार करेगा। |
SimpleConsumer आवेदन
निर्माता के आवेदन के चरण यहां समान हैं। सबसे पहले, अपने चिड़ियाघरकीपर और काफ्का दलाल को शुरू करें। फिर SimpleCon-sumer.java
नाम के जावा वर्ग के साथ एक SimpleConsumer
एप्लिकेशन बनाएं
और निम्न कोड टाइप करें।
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation - एप्लिकेशन को निम्न कमांड का उपयोग करके संकलित किया जा सकता है।
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution − एप्लिकेशन को निम्न आदेश का उपयोग करके निष्पादित किया जा सकता है
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input- निर्माता CLI खोलें और विषय पर कुछ संदेश भेजें। आप 'हेलो कंज्यूमर' के रूप में स्माइल इनपुट डाल सकते हैं।
Output - निम्नलिखित आउटपुट होगा।
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
उपभोक्ता समूह कफका विषयों से बहु-थ्रेडेड या बहु-मशीन खपत है।
उपभोक्ता समूह
उपभोक्ता एक ही
group.id
का उपयोग करके एक समूह में शामिल हो सकते हैं।
एक समूह की अधिकतम समानता यह है कि समूह में उपभोक्ताओं की संख्या part विभाजन की संख्या नहीं है।
काफ्का एक समूह में उपभोक्ता को एक विषय के विभाजन को असाइन करता है, ताकि प्रत्येक विभाजन समूह में ठीक एक उपभोक्ता द्वारा खपत हो।
काफ्का गारंटी देता है कि एक संदेश केवल समूह में किसी एकल उपभोक्ता द्वारा पढ़ा जाता है।
उपभोक्ता उस संदेश को देख सकते हैं जिस क्रम में उन्हें लॉग में संग्रहीत किया गया था।
एक उपभोक्ता का फिर से संतुलन
अधिक प्रक्रियाओं / थ्रेड्स को जोड़ने से काफ्का को फिर से संतुलन मिलेगा। यदि कोई भी उपभोक्ता या ब्रोकर दिल की धड़कन को चिड़ियाघर कीपर में भेजने में विफल रहता है, तो इसे काफ्का क्लस्टर के माध्यम से फिर से कॉन्फ़िगर किया जा सकता है। इस पुनः संतुलन के दौरान, काफ्का उपलब्ध थ्रेड्स को उपलब्ध विभाजन प्रदान करेगा, संभवतः एक विभाजन को दूसरी प्रक्रिया में ले जाएगा।
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
संकलन
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
क्रियान्वयन
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
यहां हमने दो उपभोक्ताओं के साथ मेरे समूह के
रूप में एक नमूना समूह नाम बनाया है । इसी तरह, आप समूह में अपने समूह और उपभोक्ताओं की संख्या बना सकते हैं।
इनपुट
निर्माता CLI खोलें और कुछ संदेश भेजें -
Test consumer group 01
Test consumer group 02
पहली प्रक्रिया का आउटपुट
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
दूसरी प्रक्रिया का आउटपुट
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
अब उम्मीद है कि आप Java क्लाइंट डेमो का उपयोग करके SimpleConsumer और ConsumeGroup समझ गए होंगे। अब आपके पास जावा क्लाइंट का उपयोग करके संदेश भेजने और प्राप्त करने के तरीके के बारे में एक विचार है। हमें अगले अध्याय में बड़ी डेटा तकनीकों के साथ काफ्का एकीकरण जारी रखना चाहिए।
इस अध्याय में, हम सीखेंगे कि अपाका स्टॉर्म के साथ काफ्का को कैसे एकीकृत किया जाए।
तूफान के बारे में
स्टॉर्म मूल रूप से बैकथन में नाथन मार्ज़ और टीम द्वारा बनाया गया था। थोड़े समय में, अपाचे स्टॉर्म वितरित वास्तविक समय प्रसंस्करण प्रणाली के लिए एक मानक बन गया जो आपको भारी मात्रा में डेटा संसाधित करने की अनुमति देता है। तूफान बहुत तेज़ है और एक बेंचमार्क ने इसे प्रति सेकंड प्रति नोडल संसाधित एक लाख टुपल्स पर देखा। अपाचे स्टॉर्म लगातार चलता है, कॉन्फ़िगर किए गए स्रोतों (स्पाउट्स) से डेटा का उपभोग करता है और प्रोसेसिंग पाइपलाइन (बोल्ट) के नीचे डेटा को पास करता है। कॉम-बाइन्ड, स्पाउट्स और बोल्ट्स एक टोपोलॉजी बनाते हैं।
तूफान के साथ एकीकरण
काफ्का और तूफान स्वाभाविक रूप से एक दूसरे के पूरक हैं, और उनका शक्तिशाली सहयोग तेजी से बढ़ते बड़े डेटा के लिए वास्तविक समय स्ट्रीमिंग एनालिटिक्स को सक्षम बनाता है। काफ्का और स्टॉर्म एकीकरण डेवलपर्स के लिए स्टॉर्म टोपोलॉजी से डेटा धाराओं को निगलना और प्रकाशित करना आसान बनाता है।
वैचारिक प्रवाह
टोंटी धाराओं का एक स्रोत है। उदाहरण के लिए, एक टोंटी एक काफ्का विषय से ट्यूपल्स को पढ़ सकती है और उन्हें एक धारा के रूप में उत्सर्जित कर सकती है। एक बोल्ट इनपुट धाराओं का उपयोग करता है, प्रक्रिया करता है और संभवतः नई धाराएं उत्सर्जित करता है। बोल्ट रनिंग फ़ंक्शंस, ट्यूपल्स को फ़िल्टर करने, स्ट्रीमिंग एग्रीगेशन, स्ट्रीमिंग जॉइनिंग, डेटाबेस से बात करने, और बहुत कुछ कर सकते हैं। एक तूफान टोपोलॉजी में प्रत्येक नोड समानांतर में निष्पादित होता है। एक टोपोलॉजी अनिश्चित काल तक चलती है जब तक आप इसे समाप्त नहीं करते। तूफान स्वचालित रूप से किसी भी असफल कार्यों को फिर से असाइन करेगा। इसके अतिरिक्त, तूफान गारंटी देता है कि कोई भी डेटा हानि नहीं होगी, भले ही मशीनें नीचे जाएं और संदेश गिराए जाएं।
आइए काफ्का-तूफान एकीकरण एपीआई के बारे में विस्तार से जानते हैं। स्टाफ़ के साथ काफ्का को एकीकृत करने के लिए तीन मुख्य वर्ग हैं। वे इस प्रकार हैं -
ब्रोकरहॉस्ट्स - ZkHosts & StaticHosts
ब्रोकरहोस्ट एक इंटरफ़ेस है और ZkHosts और StaticHosts इसके दो मुख्य कार्यान्वयन हैं। ZkHosts का उपयोग ज़ूकेर में विवरणों को बनाए रखने के द्वारा गतिशील रूप से काफ्का दलालों को ट्रैक करने के लिए किया जाता है, जबकि StaticHosts का उपयोग काफ्का दलालों और इसके विवरणों को मैन्युअल रूप से सेट करने के लिए किया जाता है। ZkHosts काफ्का ब्रोकर तक पहुंचने का सरल और तेज़ तरीका है।
ZkHosts का हस्ताक्षर इस प्रकार है -
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
जहाँ ब्रोकरZStStr ज़ूकेर होस्ट है और ब्रोकरज़कपैथ काफ्का ब्रोकर विवरण को बनाए रखने के लिए ज़ूकीपर पथ है।
काफ्काकोफिग एपीआई
कफका क्लस्टर के लिए कॉन्फ़िगरेशन सेटिंग्स को परिभाषित करने के लिए इस एपीआई का उपयोग किया जाता है। काफ्का कॉन-अंजीर के हस्ताक्षर को निम्नानुसार परिभाषित किया गया है
public KafkaConfig(BrokerHosts hosts, string topic)
Hosts - ब्रोकरहोस्ट्स ZkHosts / StaticHosts हो सकते हैं।
Topic - विषय का नाम।
SpoutConfig API
Spoutconfig KafkaConfig का एक विस्तार है जो अतिरिक्त ZooKeeper जानकारी का समर्थन करता है।
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
Hosts - ब्रोकरहॉस्ट्स ब्रोकरहॉस्ट्स इंटरफ़ेस का कोई भी कार्यान्वयन हो सकता है
Topic - विषय का नाम।
zkRoot - चिड़ियाघर कीपर रूट।
id −टोंटी ने ज़ुकाइपर में अपनी खपत के आधार पर राज्य को संग्रहीत किया। आईडी को विशिष्ट रूप से आपके टोंटी की पहचान करनी चाहिए।
SchemeAsMultiScheme
स्कीमएम्स मल्तिसेमे एक इंटरफ़ेस है जो यह बताता है कि कफ़्का से प्राप्त बाइटबफ़र कैसे तूफान तूफान में तब्दील हो जाता है। यह MultiScheme से लिया गया है और योजना वर्ग के कार्यान्वयन को स्वीकार करता है। स्कीम वर्ग के कार्यान्वयन के बहुत सारे हैं और एक ऐसा कार्यान्वयन है स्ट्रिंगस्चेम, जो एक साधारण स्ट्रिंग के रूप में बाइट को पार्स करता है। यह आपके आउटपुट फ़ील्ड के नामकरण को भी नियंत्रित करता है। हस्ताक्षर को निम्नानुसार परिभाषित किया गया है।
public SchemeAsMultiScheme(Scheme scheme)
Scheme - कफका से भस्म बफर।
KafkaSpout एपीआई
KafkaSpout हमारा टोंटी कार्यान्वयन है, जो स्टॉर्म के साथ एकीकृत होगा। यह काफ्का विषय से मेस-ऋषि प्राप्त करता है और इसे तूफान के रूप में तूफान पारिस्थितिकी तंत्र में फेंक देता है। KafkaSpout को SpoutConfig से इसके विन्यास-विवरण का विवरण मिलता है।
नीचे एक साधारण काफ्का टोंटी बनाने के लिए एक नमूना कोड है।
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
बोल्ट निर्माण
बोल्ट एक घटक है जो ट्यूपल्स को इनपुट के रूप में लेता है, टपल को संसाधित करता है, और आउटपुट के रूप में नए ट्यूपल का उत्पादन करता है। बोल्ट IRichBolt इंटरफ़ेस को लागू करेंगे। इस कार्यक्रम में, दो बोल्ट कक्षाएं WordSplitter-Bolt और WordCounterBolt का उपयोग ऑपरेशन करने के लिए किया जाता है।
IRichBolt इंटरफ़ेस के निम्नलिखित तरीके हैं -
Prepare- निष्पादित करने के लिए पर्यावरण के साथ बोल्ट प्रदान करता है। निष्पादक इस पद्धति को टोंटी को आरंभ करने के लिए चलाएगा।
Execute - इनपुट के एकल टपल की प्रक्रिया करें।
Cleanup - जब एक बोल्ट बंद होने जा रहा है तो कॉल किया गया।
declareOutputFields - टपल के आउटपुट स्कीमा की घोषणा करता है।
स्प्लिटबोल्ट.जवा बनाते हैं, जो शब्दों में वाक्य को विभाजित करने के लिए तर्क को लागू करता है और काउंटबोल्ट.जवा, जो अद्वितीय शब्दों को अलग करने के लिए तर्क को लागू करता है और इसकी घटना को गिनता है।
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
टोपोलॉजी के लिए प्रस्तुत करना
स्टॉर्म टोपोलॉजी मूल रूप से एक थ्रिफ्ट संरचना है। TopologyBuilder वर्ग जटिल टोपोलॉजी बनाने के लिए सरल और आसान तरीके प्रदान करता है। TopologyBuilder वर्ग में टोंटी (सेटस्पाउट) और बोल्ट (सेटबोल्ट) सेट करने की विधियाँ हैं। अंत में, टोपोलॉजीब्युल्टोलॉजी ने टोपोलॉजी को क्रियोलॉजी बनाने के लिए बनाया है। शफ़लग्रुपिंग और फ़ील्ड्सग्रुपिंग विधियाँ टोंटी और बोल्ट के लिए धारा समूह निर्धारण में मदद करती हैं।
Local Cluster- विकास के उद्देश्यों के लिए, हम LocalCluster
ऑब्जेक्ट का उपयोग करके एक स्थानीय क्लस्टर बना सकते हैं और फिर LocalCluster
वर्ग की सबमिटटॉपोलॉजी
पद्धति का उपयोग करके टोपोलॉजी जमा कर सकते हैं
।
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
संकलन को आगे बढ़ाने से पहले, काका-स्टॉर्म एकीकरण को क्यूरेटर ज़ूकिपर क्लाइंट जावा लाइब्रेरी की आवश्यकता है। क्यूरेटर संस्करण 2.9.1 समर्थन अपाचे स्टॉर्म संस्करण 0.9.5 (जो हम इस ट्यूटोरियल में उपयोग करते हैं)। नीचे निर्दिष्ट जार फ़ाइलों को डाउन-लोड करें और इसे जावा क्लास पथ में रखें।
- curator-client-2.9.1.jar
- curator-framework-2.9.1.jar
निर्भरता फ़ाइलों को शामिल करने के बाद, निम्नलिखित कमांड का उपयोग करके प्रोग्राम को संकलित करें,
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
क्रियान्वयन
काफ्का निर्माता सीएलआई (पिछले अध्याय में समझाया गया) शुरू करें, मेरा पहला-विषय
नामक एक नया विषय बनाएं और नीचे दिखाए गए कुछ नमूना संदेश प्रदान करें -
hello
kafka
storm
spark
test message
another test message
अब निम्नलिखित कमांड का उपयोग करके एप्लिकेशन को निष्पादित करें -
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
इस एप्लिकेशन का नमूना आउटपुट नीचे निर्दिष्ट किया गया है -
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2
इस अध्याय में, हम स्पार्क स्ट्रीमिंग एपीआई के साथ अपाचे काफ्का को एकीकृत करने के तरीके के बारे में चर्चा करेंगे।
स्पार्क के बारे में
स्पार्क स्ट्रीमिंग एपीआई स्केलेबल, उच्च-थ्रूपुट, लाइव डेटा धाराओं के दोष-सहिष्णु स्ट्रीम प्रसंस्करण को सक्षम करता है। डेटा को काफ्का, फ्लूम, ट्विटर इत्यादि जैसे कई स्रोतों से प्राप्त किया जा सकता है, और जटिल एल्गोरिदम जैसे मानचित्र, कम, जुड़ने और खिड़की जैसे उच्च-स्तरीय कार्यों का उपयोग करके संसाधित किया जा सकता है। अंत में, संसाधित डेटा को फाइल सिस्टम, डेटाबेस और लाइव डैश-बोर्ड पर धकेल दिया जा सकता है। रेसिलिएंट डिस्ट्रिब्यूटेड डेटासेट्स (आरडीडी) स्पार्क की एक मूलभूत डेटा संरचना है। यह वस्तुओं का एक अपरिवर्तित वितरित संग्रह है। RDD में प्रत्येक डेटासेट को तार्किक विभाजन में विभाजित किया गया है, जिसे क्लस्टर के विभिन्न नोड्स पर गणना की जा सकती है।
स्पार्क के साथ एकीकरण
काफ्का स्पार्क स्ट्रीमिंग के लिए एक संभावित संदेश और एकीकरण मंच है। कफका डेटा की वास्तविक समय की धाराओं के लिए केंद्रीय केंद्र के रूप में कार्य करता है और स्पार्क स्ट्रीमिंग में जटिल एल्गोरिदम का उपयोग करके संसाधित किया जाता है। एक बार डेटा संसाधित होने के बाद, स्पार्क स्ट्रीमिंग एचडीएफएस, डेटाबेस या डैशबोर्ड में अभी तक किसी अन्य काफ़्का विषय या स्टोर में परिणाम प्रकाशित कर सकती है। निम्नलिखित चित्र में वैचारिक प्रवाह को दर्शाया गया है।
अब, हम काफ्का-स्पार्क एपीआई के बारे में विस्तार से जानते हैं।
स्पार्ककोन एपीआई
यह स्पार्क एप्लिकेशन के लिए कॉन्फ़िगरेशन का प्रतिनिधित्व करता है। विभिन्न स्पार्क मापदंडों को कुंजी-मूल्य जोड़े के रूप में सेट करने के लिए उपयोग किया जाता है।
SparkConf
वर्ग के निम्नलिखित तरीके हैं -
set(string key, string value) - कॉन्फ़िगरेशन चर सेट करें।
remove(string key) - कॉन्फ़िगरेशन से कुंजी निकालें।
setAppName(string name) - अपने आवेदन के लिए आवेदन नाम सेट करें।
get(string key) - कुंजी प्राप्त करें
StreamingContext API
यह स्पार्क कार्यक्षमता के लिए मुख्य प्रवेश बिंदु है। SparkContext एक स्पार्क क्लस्टर से कनेक्शन का प्रतिनिधित्व करता है, और इसका उपयोग क्लस्टर पर RDDs, संचायक और प्रसारण चर बनाने के लिए किया जा सकता है। हस्ताक्षर को नीचे दिखाए अनुसार परिभाषित किया गया है।
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - कनेक्ट करने के लिए क्लस्टर URL (जैसे मेसोस: // होस्ट: पोर्ट, स्पार्क: // होस्ट: पोर्ट, लोकल [4])।
appName - क्लस्टर वेब UI पर प्रदर्शित करने के लिए आपकी नौकरी का एक नाम
batchDuration - समय अंतराल जिस पर स्ट्रीमिंग डेटा को बैचों में विभाजित किया जाएगा
public StreamingContext(SparkConf conf, Duration batchDuration)
नई स्पार्ककॉन्टेक्ट के लिए आवश्यक कॉन्फ़िगरेशन प्रदान करके एक स्ट्रीमिंग कॉन्टेक्स्ट बनाएं।
conf - स्पार्क पैरामीटर
batchDuration - समय अंतराल जिस पर स्ट्रीमिंग डेटा को बैचों में विभाजित किया जाएगा
काफ्काटिल्स एपीआई
KafkaUtils API का उपयोग काफ्का क्लस्टर को स्पार्क स्ट्रीमिंग से जोड़ने के लिए किया जाता है। इस एपीआई में नीचे की तरह परिभाषित साइन
-कैंट विधि createStream
हस्ताक्षर है।
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
ऊपर दी गई विधि का उपयोग एक इनपुट स्ट्रीम बनाने के लिए किया जाता है जो काफ्का ब्रोकर्स के संदेशों को खींचता है।
ssc - StreamingContext ऑब्जेक्ट।
zkQuorum - ज़ूकीपर कोरम।
groupId - इस उपभोक्ता के लिए ग्रुप आई.डी.
topics - उपभोग करने के लिए विषयों का एक नक्शा लौटाएं।
storageLevel - प्राप्त वस्तुओं के भंडारण के लिए उपयोग करने के लिए संग्रहण स्तर।
KafkaUtils API के पास एक और तरीका createDirectStream है, जिसका उपयोग एक इनपुट स्ट्रीम बनाने के लिए किया जाता है जो किसी भी रिसीवर का उपयोग किए बिना सीधे Kafka Brokers के संदेशों को खींचता है। यह धारा इस बात की गारंटी दे सकती है कि काफ़्का का प्रत्येक संदेश बिल्कुल एक बार परिवर्तनों में शामिल है।
नमूना आवेदन स्काला में किया जाता है। एप्लिकेशन को संकलित करने के लिए, कृपया sbt
, scala बिल्ड टूल ( मावेन के
समान) डाउनलोड और इंस्टॉल करें । मुख्य आवेदन कोड नीचे प्रस्तुत किया गया है।
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
स्क्रिप्ट बनाएँ
स्पार्क-काफ्का एकीकरण स्पार्क, स्पार्क स्ट्रीमिंग और स्पार्क कफका एकीकरण जार पर निर्भर करता है। एक नई फ़ाइल build.sbt
बनाएँ और एप्लिकेशन विवरण और उसकी निर्भरता निर्दिष्ट करें। एसबीटी
जबकि संकलन और आवेदन पैकिंग आवश्यक जार डाउनलोड करेगा।
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
संकलन / पैकेजिंग
एप्लिकेशन की जार फ़ाइल को संकलित करने और पैकेज करने के लिए निम्न कमांड चलाएँ। हमें एप्लिकेशन को चलाने के लिए जार फाइल को स्पार्क कंसोल में जमा करना होगा।
sbt package
स्पार्क के लिए प्रस्तुत करना
काफ्का निर्माता सीएलआई शुरू करें (पिछले अध्याय में समझाया गया है), मेरा पहला-विषय
नामक एक नया विषय बनाएं और नीचे दिखाए गए अनुसार कुछ नमूना संदेश प्रदान करें।
Another spark test message
स्पार्क कंसोल को एप्लिकेशन सबमिट करने के लिए निम्न कमांड चलाएँ।
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
इस एप्लिकेशन का नमूना आउटपुट नीचे दिखाया गया है।
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..
आइए हम नवीनतम ट्विटर फ़ीड और उसके हैशटैग प्राप्त करने के लिए वास्तविक समय एप्लिकेशन का विश्लेषण करें। इससे पहले हमने काफ्का के साथ स्टॉर्म और स्पार्क का एकीकरण देखा है। दोनों परिदृश्यों में, हमने काफ्का पारिस्थितिकी तंत्र को संदेश भेजने के लिए एक काफ्का निर्माता (सीएलआई का उपयोग करके) बनाया। फिर, तूफ़ान और चिंगारी पूर्णांक में कफ़्का उपभोक्ता का उपयोग करके संदेशों को पढ़ता है और इसे क्रमशः तूफान और स्पार्क पारिस्थितिकी तंत्र में इंजेक्ट करता है। तो, व्यावहारिक रूप से हमें एक काफ्का निर्माता बनाने की आवश्यकता है, जो चाहिए -
- "ट्विटर स्ट्रीमिंग एपीआई" का उपयोग करके ट्विटर फ़ीड पढ़ें,
- फ़ीड की प्रक्रिया करें,
- हैशटैग निकालें और
- इसे काफ्का को भेजें।
एक बार जब हैशटैग
काफ्का, तूफान से प्राप्त कर रहे हैं / स्पार्क एकीकरण सूचनाओं-एनीमेशन प्राप्त करते हैं और तूफान / स्पार्क पारिस्थितिकी तंत्र को भेजें।
ट्विटर स्ट्रीमिंग एपीआई
"ट्विटर स्ट्रीमिंग एपीआई" को किसी भी प्रोग्रामिंग भाषा में एक्सेस किया जा सकता है। "Twitter4j" एक खुला स्रोत है, अनौपचारिक जावा पुस्तकालय, जो आसानी से "ट्विटर स्ट्रीमिंग एपीआई" तक पहुंचने के लिए जावा आधारित मॉड्यूल प्रदान करता है। "Twitter4j" ट्वीट्स को एक्सेस करने के लिए एक श्रोता आधारित रूपरेखा प्रदान करता है। "ट्विटर स्ट्रीमिंग एपीआई" का उपयोग करने के लिए, हमें ट्विटर डेवलपर खाते के लिए साइन इन करना होगा और निम्नलिखित प्राप्त करना चाहिएOAuth प्रमाणीकरण विवरण।
- Customerkey
- CustomerSecret
- AccessToken
- AccessTookenSecret
डेवलपर अकाउंट बन जाने के बाद, “twitter4j” जार फ़ाइलों को डाउनलोड करें और इसे जावा क्लास पथ में रखें।
पूरा ट्विटर काफ्का निर्माता कोडिंग (KafkaTwitterProducer.java) नीचे सूचीबद्ध है -
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
if(args.length < 5){
System.out.println(
"Usage: KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
String consumerKey = args[0].toString();
String consumerSecret = args[1].toString();
String accessToken = args[2].toString();
String accessTokenSecret = args[3].toString();
String topicName = args[4].toString();
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
// System.out.println("@" + status.getUser().getScreenName()
+ " - " + status.getText());
// System.out.println("@" + status.getUser().getScreen-Name());
/*for(URLEntity urle : status.getURLEntities()) {
System.out.println(urle.getDisplayURL());
}*/
/*for(HashtagEntity hashtage : status.getHashtagEntities()) {
System.out.println(hashtage.getText());
}*/
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
// System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// System.out.println("Got track limitation notice:" +
num-berOfLimitedStatuses);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// System.out.println("Got scrub_geo event userId:" + userId +
"upToStatusId:" + upToStatusId);
}
@Override
public void onStallWarning(StallWarning warning) {
// System.out.println("Got stall warning:" + warning);
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery().track(keyWords);
twitterStream.filter(query);
Thread.sleep(5000);
//Add Kafka producer config settings
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
int j = 0;
while(i < 10) {
Status ret = queue.poll();
if (ret == null) {
Thread.sleep(100);
i++;
}else {
for(HashtagEntity hashtage : ret.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
producer.send(new ProducerRecord<String, String>(
top-icName, Integer.toString(j++), hashtage.getText()));
}
}
}
producer.close();
Thread.sleep(5000);
twitterStream.shutdown();
}
}
संकलन
निम्नलिखित कमांड का उपयोग करके एप्लिकेशन को संकलित करें -
javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java
क्रियान्वयन
दो कंसोल खोलें। ऊपर संकलित एप्लिकेशन को चलाएं जैसा कि नीचे एक कंसोल में दिखाया गया है।
java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food
स्पार्क / स्टॉर्म के किसी एक अनुप्रयोग को पिछले अध्याय में एक और जीत-दहेज में चलाएं। ध्यान देने वाली मुख्य बात यह है कि उपयोग किया जाने वाला विषय दोनों ही मामलों में समान होना चाहिए। यहाँ, हमने "my-first-topic" विषय नाम के रूप में उपयोग किया है।
उत्पादन
इस एप्लिकेशन का आउटपुट कीवर्ड और ट्विटर के वर्तमान फीड पर निर्भर करेगा। एक नमूना आउटपुट नीचे निर्दिष्ट किया गया है (तूफान एकीकरण)।
. . .
food : 1
foodie : 2
burger : 1
. . .
काफ्का उपकरण “org.apache.kafka.tools। * के तहत पैक किया गया। उपकरण को सिस्टम टूल और प्रतिकृति टूल में वर्गीकृत किया गया है।
तंत्र उपकरण
सिस्टम टूल्स रन क्लास स्क्रिप्ट का उपयोग करके कमांड लाइन से चलाया जा सकता है। वाक्य विन्यास इस प्रकार है -
bin/kafka-run-class.sh package.class - - options
सिस्टम टूल्स में से कुछ नीचे दिए गए हैं -
Kafka Migration Tool - इस टूल का इस्तेमाल ब्रोकर को एक वर्जन से दूसरे वर्जन पर माइग्रेट करने के लिए किया जाता है।
Mirror Maker - इस उपकरण का उपयोग एक काफ्का क्लस्टर की मिररिंग से दूसरे को प्रदान करने के लिए किया जाता है।
Consumer Offset Checker - यह उपकरण विषय और उपभोक्ता समूह के निर्दिष्ट सेट के लिए उपभोक्ता समूह, विषय, विभाजन, ऑफ-सेट, लॉगसाइज़, मालिक प्रदर्शित करता है।
प्रतिकृति उपकरण
काफ्का प्रतिकृति एक उच्च स्तरीय डिजाइन उपकरण है। प्रतिकृति उपकरण जोड़ने का उद्देश्य मजबूत स्थायित्व और उच्च उपलब्धता के लिए है। कुछ प्रतिकृति उपकरण नीचे दिए गए हैं -
Create Topic Tool - यह डिफ़ॉल्ट विभाजन, प्रतिकृति कारक के साथ एक विषय बनाता है और प्रतिकृति असाइनमेंट करने के लिए काफ्का की डिफ़ॉल्ट योजना का उपयोग करता है।
List Topic Tool- यह उपकरण विषयों की दी गई सूची के लिए सूचना को सूचीबद्ध करता है। यदि कमांड लाइन में कोई विषय प्रदान नहीं किया जाता है, तो टूल सभी प्रश्नों को प्राप्त करने के लिए ज़ुकाइपर से पूछताछ करता है और उनके लिए जानकारी सूचीबद्ध करता है। उपकरण प्रदर्शित करने वाले क्षेत्र विषय का नाम, विभाजन, नेता, प्रतिकृतियां, आईएसआर हैं।
Add Partition Tool- किसी विषय का निर्माण, विषय के विभाजन की संख्या को निर्दिष्ट करना होगा। बाद में, विषय के लिए और अधिक विभाजन की आवश्यकता हो सकती है, जब विषय की मात्रा बढ़ जाएगी। यह उपकरण एक विशिष्ट विषय के लिए अधिक विभाजन जोड़ने में मदद करता है और जोड़े गए विभाजन के मैनुअल प्रतिकृति असाइनमेंट की भी अनुमति देता है।
काफ्का आज के कई बेहतरीन औद्योगिक अनुप्रयोगों का समर्थन करता है। हम इस अध्याय में काफ्का के कुछ सबसे उल्लेखनीय अनुप्रयोगों का एक संक्षिप्त विवरण प्रदान करेंगे।
ट्विटर
ट्विटर एक ऑनलाइन सोशल नेटवर्किंग सेवा है जो उपयोगकर्ता के ट्वीट भेजने और प्राप्त करने के लिए एक मंच प्रदान करती है। पंजीकृत उपयोगकर्ता ट्वीट पढ़ और पोस्ट कर सकते हैं, लेकिन अपंजीकृत उपयोगकर्ता केवल ट्वीट पढ़ सकते हैं। ट्विटर स्टॉर्म-काफ्का को उनके स्ट्रीम प्रोसेसिंग इन्फ्रास्ट्रक्चर के एक भाग के रूप में उपयोग करता है।
लिंक्डइन
Apache Kafka का उपयोग लिंक्डइन पर गतिविधि स्ट्रीम डेटा और ऑपरेशनल मेट्रिक्स के लिए किया जाता है। काफ्का मेस-सीजिंग सिस्टम ऑनलाइन संदेश की खपत के लिए लिंक्डइन न्यूज़फ़ीड, लिंक्डइन जैसे विभिन्न उत्पादों के साथ लिंक्डइन में मदद करता है और Hadoop जैसे ऑफ़लाइन विश्लेषिकी प्रणालियों के अलावा। लिंक्डइन के संबंध में काफ्का की मजबूत स्थायित्व भी प्रमुख कारकों में से एक है।
Netflix
नेटफ्लिक्स ऑन-डिमांड इंटरनेट स्ट्रीमिंग मीडिया का एक अमेरिकी बहुराष्ट्रीय प्रदाता है। नेटफ्लिक्स वास्तविक समय की निगरानी और घटना प्रसंस्करण के लिए काफ्का का उपयोग करता है।
mozilla
मोज़िला एक फ्री-सॉफ्टवेयर समुदाय है, जिसे 1998 में नेटस्केप के सदस्यों द्वारा बनाया गया था। काफ़्का जल्द ही टेलीमेट्री, टेस्ट पायलट, आदि जैसी परियोजनाओं के लिए अंतिम-उपयोगकर्ता के ब्राउज़र से प्रदर्शन और उपयोग डेटा एकत्र करने के लिए मोज़िला वर्तमान उत्पादन प्रणाली के एक हिस्से की जगह लेगा।
आकाशवाणी
Oracle अपने एंटरप्राइज़ सर्विस बस उत्पाद से OSB (Oracle Service Bus) नामक काफ़्का को मूल कनेक्टिविटी प्रदान करता है, जो डेवलपर्स को चरणबद्ध डेटा पाइपलाइनों को लागू करने के लिए OSB अंतर्निहित मध्यस्थता क्षमताओं का लाभ उठाने की अनुमति देता है।