अपाचे प्रेस्टो - KAFKA कनेक्टर

प्रेस्टो के लिए काफ्का कनेक्टर प्रेस्टो का उपयोग करके अपाचे काफ्का से डेटा का उपयोग करने की अनुमति देता है।

आवश्यक शर्तें

निम्नलिखित अपाचे परियोजनाओं के नवीनतम संस्करण को डाउनलोड और इंस्टॉल करें।

  • अपाचे चिड़ियाघर कीपर
  • अपाचे काफ्का

ZooKeeper प्रारंभ करें

निम्न कमांड का उपयोग करके ZooKeeper सर्वर शुरू करें।

$ bin/zookeeper-server-start.sh config/zookeeper.properties

अब, ज़ूकीपर 2181 में पोर्ट शुरू करता है।

काफ्का शुरू करें

निम्न आदेश का उपयोग करके दूसरे टर्मिनल में काफ्का प्रारंभ करें।

$ bin/kafka-server-start.sh config/server.properties

काफ्का शुरू होने के बाद, यह पोर्ट नंबर 9092 का उपयोग करता है।

TPCH डेटा

Tpch-kafka डाउनलोड करें

$  curl -o kafka-tpch 
https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 
0811-1.0.sh

अब आपने उपरोक्त कमांड का उपयोग करके लोडर को मावेन सेंट्रल से डाउनलोड किया है। आपको निम्नलिखित के समान प्रतिक्रिया मिलेगी।

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
                                 Dload  Upload   Total   Spent    Left  Speed 
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0  
  5 21.6M    5 1279k    0     0  83898      0  0:04:30  0:00:15  0:04:15  129k
  6 21.6M    6 1407k    0     0  86656      0  0:04:21  0:00:16  0:04:05  131k  
 24 21.6M   24 5439k    0     0   124k      0  0:02:57  0:00:43  0:02:14  175k 
 24 21.6M   24 5439k    0     0   124k      0  0:02:58  0:00:43  0:02:15  160k 
 25 21.6M   25 5736k    0     0   128k      0  0:02:52  0:00:44  0:02:08  181k 
 ………………………..

फिर, निम्नलिखित कमांड का उपयोग करके इसे निष्पादन योग्य बनाएं,

$ chmod 755 kafka-tpch

Tpch-kafka चलाएं

निम्नलिखित कमांड का उपयोग करके tchch डेटा के साथ कई विषयों को प्रीलोड करने के लिए kafka-tpch प्रोग्राम चलाएं।

सवाल

$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny

परिणाम

2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging 
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
………………………
……………………….

अब, काफ्का तालिकाओं के ग्राहक, ऑर्डर, सप्लायर आदि को tpch का उपयोग करके लोड किया जाता है।

कॉन्फ़िगरेशन सेटिंग्स जोड़ें

आइए Presto सर्वर पर निम्न काफ्का कनेक्टर कॉन्फ़िगरेशन सेटिंग्स जोड़ें।

connector.name = kafka  

kafka.nodes = localhost:9092  

kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, 
tpch.supplier,tpch.nation,tpch.region  

kafka.hide-internal-columns = false

उपरोक्त कॉन्फ़िगरेशन में, काफ्का-टेबल प्रोग्राम का उपयोग करके काफ्का तालिकाओं को लोड किया जाता है।

प्रेस्टो सीएलआई शुरू करें

निम्नलिखित कमांड का उपयोग करके प्रेस्टो सीएलआई शुरू करें,

$ ./presto --server localhost:8080 --catalog kafka —schema tpch;

यहाँ “tpch" काफ्का कनेक्टर के लिए एक स्कीमा है और आपको निम्न के रूप में एक प्रतिक्रिया प्राप्त होगी।

presto:tpch>

सूची सारणी

निम्नलिखित क्वेरी में सभी तालिकाओं को सूचीबद्ध किया गया है “tpch” स्कीमा।

सवाल

presto:tpch> show tables;

परिणाम

Table 
---------- 
 customer 
 lineitem 
 nation 
 orders
 part 
 partsupp 
 region 
 supplier

ग्राहक तालिका का वर्णन करें

निम्नलिखित क्वेरी का वर्णन करता है “customer” तालिका।

सवाल

presto:tpch> describe customer;

परिणाम

Column           |  Type   |                   Comment 
-------------------+---------+--------------------------------------------- 
 _partition_id     | bigint  | Partition Id 
 _partition_offset | bigint  | Offset for the message within the partition 
 _segment_start    | bigint  | Segment start offset 
 _segment_end      | bigint  | Segment end offset 
 _segment_count    | bigint  | Running message count per segment 
 _key              | varchar | Key text 
 _key_corrupt      | boolean | Key data is corrupt 
 _key_length       | bigint  | Total number of key bytes 
 _message          | varchar | Message text 
 _message_corrupt  | boolean | Message data is corrupt 
 _message_length   | bigint  | Total number of message bytes