अपाचे प्रेस्टो - KAFKA कनेक्टर
प्रेस्टो के लिए काफ्का कनेक्टर प्रेस्टो का उपयोग करके अपाचे काफ्का से डेटा का उपयोग करने की अनुमति देता है।
आवश्यक शर्तें
निम्नलिखित अपाचे परियोजनाओं के नवीनतम संस्करण को डाउनलोड और इंस्टॉल करें।
- अपाचे चिड़ियाघर कीपर
- अपाचे काफ्का
ZooKeeper प्रारंभ करें
निम्न कमांड का उपयोग करके ZooKeeper सर्वर शुरू करें।
$ bin/zookeeper-server-start.sh config/zookeeper.properties
अब, ज़ूकीपर 2181 में पोर्ट शुरू करता है।
काफ्का शुरू करें
निम्न आदेश का उपयोग करके दूसरे टर्मिनल में काफ्का प्रारंभ करें।
$ bin/kafka-server-start.sh config/server.properties
काफ्का शुरू होने के बाद, यह पोर्ट नंबर 9092 का उपयोग करता है।
TPCH डेटा
Tpch-kafka डाउनलोड करें
$ curl -o kafka-tpch
अब आपने उपरोक्त कमांड का उपयोग करके लोडर को मावेन सेंट्रल से डाउनलोड किया है। आपको निम्नलिखित के समान प्रतिक्रिया मिलेगी।
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- 0:00:01 --:--:-- 0
5 21.6M 5 1279k 0 0 83898 0 0:04:30 0:00:15 0:04:15 129k
6 21.6M 6 1407k 0 0 86656 0 0:04:21 0:00:16 0:04:05 131k
24 21.6M 24 5439k 0 0 124k 0 0:02:57 0:00:43 0:02:14 175k
24 21.6M 24 5439k 0 0 124k 0 0:02:58 0:00:43 0:02:15 160k
25 21.6M 25 5736k 0 0 128k 0 0:02:52 0:00:44 0:02:08 181k
फिर, निम्नलिखित कमांड का उपयोग करके इसे निष्पादन योग्य बनाएं,
$ chmod 755 kafka-tpch
Tpch-kafka चलाएं
निम्नलिखित कमांड का उपयोग करके tchch डेटा के साथ कई विषयों को प्रीलोड करने के लिए kafka-tpch प्रोग्राम चलाएं।
$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
अब, काफ्का तालिकाओं के ग्राहक, ऑर्डर, सप्लायर आदि को tpch का उपयोग करके लोड किया जाता है।
कॉन्फ़िगरेशन सेटिंग्स जोड़ें
आइए Presto सर्वर पर निम्न काफ्का कनेक्टर कॉन्फ़िगरेशन सेटिंग्स जोड़ें।
connector.name = kafka
kafka.nodes = localhost:9092
kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,
kafka.hide-internal-columns = false
उपरोक्त कॉन्फ़िगरेशन में, काफ्का-टेबल प्रोग्राम का उपयोग करके काफ्का तालिकाओं को लोड किया जाता है।
प्रेस्टो सीएलआई शुरू करें
निम्नलिखित कमांड का उपयोग करके प्रेस्टो सीएलआई शुरू करें,
$ ./presto --server localhost:8080 --catalog kafka —schema tpch;
यहाँ “tpch" काफ्का कनेक्टर के लिए एक स्कीमा है और आपको निम्न के रूप में एक प्रतिक्रिया प्राप्त होगी।
सूची सारणी
निम्नलिखित क्वेरी में सभी तालिकाओं को सूचीबद्ध किया गया है “tpch” स्कीमा।
presto:tpch> show tables;
ग्राहक तालिका का वर्णन करें
निम्नलिखित क्वेरी का वर्णन करता है “customer” तालिका।
presto:tpch> describe customer;
Column | Type | Comment
_partition_id | bigint | Partition Id
_partition_offset | bigint | Offset for the message within the partition
_segment_start | bigint | Segment start offset
_segment_end | bigint | Segment end offset
_segment_count | bigint | Running message count per segment
_key | varchar | Key text
_key_corrupt | boolean | Key data is corrupt
_key_length | bigint | Total number of key bytes
_message | varchar | Message text
_message_corrupt | boolean | Message data is corrupt
_message_length | bigint | Total number of message bytes