Apache Presto - ตัวเชื่อมต่อ KAFKA

Kafka Connector สำหรับ Presto อนุญาตให้เข้าถึงข้อมูลจาก Apache Kafka โดยใช้ Presto

ข้อกำหนดเบื้องต้น

ดาวน์โหลดและติดตั้งเวอร์ชันล่าสุดของโครงการ Apache ต่อไปนี้

  • Apache ZooKeeper
  • อาปาเช่คาฟคา

เริ่ม ZooKeeper

เริ่มเซิร์ฟเวอร์ ZooKeeper โดยใช้คำสั่งต่อไปนี้

$ bin/zookeeper-server-start.sh config/zookeeper.properties

ตอนนี้ ZooKeeper เริ่มพอร์ตในปี 2181

เริ่ม Kafka

เริ่ม Kafka ในเทอร์มินัลอื่นโดยใช้คำสั่งต่อไปนี้

$ bin/kafka-server-start.sh config/server.properties

หลังจากเริ่มต้นคาฟคาจะใช้พอร์ตหมายเลข 9092

ข้อมูล TPCH

ดาวน์โหลด tpch-kafka

$  curl -o kafka-tpch 
https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 
0811-1.0.sh

ตอนนี้คุณได้ดาวน์โหลดตัวโหลดจาก Maven central โดยใช้คำสั่งด้านบน คุณจะได้รับคำตอบที่คล้ายกันดังต่อไปนี้

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
                                 Dload  Upload   Total   Spent    Left  Speed 
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0  
  5 21.6M    5 1279k    0     0  83898      0  0:04:30  0:00:15  0:04:15  129k
  6 21.6M    6 1407k    0     0  86656      0  0:04:21  0:00:16  0:04:05  131k  
 24 21.6M   24 5439k    0     0   124k      0  0:02:57  0:00:43  0:02:14  175k 
 24 21.6M   24 5439k    0     0   124k      0  0:02:58  0:00:43  0:02:15  160k 
 25 21.6M   25 5736k    0     0   128k      0  0:02:52  0:00:44  0:02:08  181k 
 ………………………..

จากนั้นให้เรียกใช้งานได้โดยใช้คำสั่งต่อไปนี้

$ chmod 755 kafka-tpch

เรียกใช้ tpch-kafka

รันโปรแกรม kafka-tpch เพื่อโหลดหัวข้อต่างๆล่วงหน้าด้วยข้อมูล tpch โดยใช้คำสั่งต่อไปนี้

แบบสอบถาม

$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny

ผลลัพธ์

2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging 
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
………………………
……………………….

ตอนนี้ลูกค้าตารางคาฟคาคำสั่งซัพพลายเออร์ ฯลฯ ถูกโหลดโดยใช้ tpch

เพิ่ม Config Settings

เพิ่มการตั้งค่าคอนฟิกคอนฟิกตัวเชื่อมต่อ Kafka ต่อไปนี้บนเซิร์ฟเวอร์ Presto

connector.name = kafka  

kafka.nodes = localhost:9092  

kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, 
tpch.supplier,tpch.nation,tpch.region  

kafka.hide-internal-columns = false

ในการกำหนดค่าข้างต้นตาราง Kafka จะโหลดโดยใช้โปรแกรม Kafka-tpch

เริ่ม Presto CLI

เริ่ม Presto CLI โดยใช้คำสั่งต่อไปนี้

$ ./presto --server localhost:8080 --catalog kafka —schema tpch;

ที่นี่ “tpch" เป็นสคีมาสำหรับตัวเชื่อมต่อ Kafka และคุณจะได้รับคำตอบดังต่อไปนี้

presto:tpch>

รายการตาราง

แบบสอบถามต่อไปนี้แสดงรายการตารางทั้งหมดใน “tpch” สคีมา

แบบสอบถาม

presto:tpch> show tables;

ผลลัพธ์

Table 
---------- 
 customer 
 lineitem 
 nation 
 orders
 part 
 partsupp 
 region 
 supplier

อธิบายตารางลูกค้า

แบบสอบถามต่อไปนี้อธิบาย “customer” ตาราง.

แบบสอบถาม

presto:tpch> describe customer;

ผลลัพธ์

Column           |  Type   |                   Comment 
-------------------+---------+--------------------------------------------- 
 _partition_id     | bigint  | Partition Id 
 _partition_offset | bigint  | Offset for the message within the partition 
 _segment_start    | bigint  | Segment start offset 
 _segment_end      | bigint  | Segment end offset 
 _segment_count    | bigint  | Running message count per segment 
 _key              | varchar | Key text 
 _key_corrupt      | boolean | Key data is corrupt 
 _key_length       | bigint  | Total number of key bytes 
 _message          | varchar | Message text 
 _message_corrupt  | boolean | Message data is corrupt 
 _message_length   | bigint  | Total number of message bytes