Apache Presto - Đầu nối KAFKA

Kafka Connector cho Presto cho phép truy cập dữ liệu từ Apache Kafka bằng Presto.

Điều kiện tiên quyết

Tải xuống và cài đặt phiên bản mới nhất của các dự án Apache sau.

  • Apache ZooKeeper
  • Apache Kafka

Khởi động ZooKeeper

Khởi động máy chủ ZooKeeper bằng lệnh sau.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Bây giờ, ZooKeeper bắt đầu cổng vào năm 2181.

Khởi động Kafka

Khởi động Kafka trong một thiết bị đầu cuối khác bằng lệnh sau.

$ bin/kafka-server-start.sh config/server.properties

Sau khi kafka khởi động, nó sử dụng số cổng 9092.

Dữ liệu TPCH

Tải xuống tpch-kafka

$  curl -o kafka-tpch 
https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 
0811-1.0.sh

Bây giờ bạn đã tải xuống trình tải từ Maven central bằng lệnh trên. Bạn sẽ nhận được một phản hồi tương tự như sau.

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
                                 Dload  Upload   Total   Spent    Left  Speed 
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0  
  5 21.6M    5 1279k    0     0  83898      0  0:04:30  0:00:15  0:04:15  129k
  6 21.6M    6 1407k    0     0  86656      0  0:04:21  0:00:16  0:04:05  131k  
 24 21.6M   24 5439k    0     0   124k      0  0:02:57  0:00:43  0:02:14  175k 
 24 21.6M   24 5439k    0     0   124k      0  0:02:58  0:00:43  0:02:15  160k 
 25 21.6M   25 5736k    0     0   128k      0  0:02:52  0:00:44  0:02:08  181k 
 ………………………..

Sau đó, làm cho nó có thể thực thi được bằng lệnh sau,

$ chmod 755 kafka-tpch

Chạy tpch-kafka

Chạy chương trình kafka-tpch để tải trước một số chủ đề với dữ liệu tpch bằng lệnh sau.

Truy vấn

$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny

Kết quả

2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging 
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
………………………
……………………….

Giờ đây, khách hàng, đơn đặt hàng, nhà cung cấp, v.v. của Kafka được tải bằng tpch.

Thêm cài đặt cấu hình

Hãy thêm cài đặt cấu hình trình kết nối Kafka sau trên máy chủ Presto.

connector.name = kafka  

kafka.nodes = localhost:9092  

kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, 
tpch.supplier,tpch.nation,tpch.region  

kafka.hide-internal-columns = false

Trong cấu hình trên, các bảng Kafka được tải bằng chương trình Kafka-tpch.

Bắt đầu Presto CLI

Khởi động Presto CLI bằng lệnh sau,

$ ./presto --server localhost:8080 --catalog kafka —schema tpch;

Đây “tpch" là một lược đồ cho trình kết nối Kafka và bạn sẽ nhận được phản hồi như sau.

presto:tpch>

Bảng liệt kê

Truy vấn sau liệt kê tất cả các bảng trong “tpch” lược đồ.

Truy vấn

presto:tpch> show tables;

Kết quả

Table 
---------- 
 customer 
 lineitem 
 nation 
 orders
 part 
 partsupp 
 region 
 supplier

Mô tả bảng khách hàng

Truy vấn sau đây mô tả “customer” bàn.

Truy vấn

presto:tpch> describe customer;

Kết quả

Column           |  Type   |                   Comment 
-------------------+---------+--------------------------------------------- 
 _partition_id     | bigint  | Partition Id 
 _partition_offset | bigint  | Offset for the message within the partition 
 _segment_start    | bigint  | Segment start offset 
 _segment_end      | bigint  | Segment end offset 
 _segment_count    | bigint  | Running message count per segment 
 _key              | varchar | Key text 
 _key_corrupt      | boolean | Key data is corrupt 
 _key_length       | bigint  | Total number of key bytes 
 _message          | varchar | Message text 
 _message_corrupt  | boolean | Message data is corrupt 
 _message_length   | bigint  | Total number of message bytes