Apache Kafka - การทำงานขั้นพื้นฐาน

ขั้นแรกให้เราเริ่มใช้การกำหนดค่าโบรกเกอร์โหนดเดียวจากนั้นเราจะย้ายการตั้งค่าของเราไปยังการกำหนดค่าโบรกเกอร์โหนดเดียวหลายรายการ

หวังว่าคุณจะติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณในตอนนี้ ก่อนที่จะย้ายไปที่ Kafka Cluster Setup ก่อนอื่นคุณต้องเริ่ม ZooKeeper ของคุณเนื่องจาก Kafka Cluster ใช้ ZooKeeper

เริ่ม ZooKeeper

เปิดเทอร์มินัลใหม่และพิมพ์คำสั่งต่อไปนี้ -

bin/zookeeper-server-start.sh config/zookeeper.properties

ในการเริ่ม Kafka Broker ให้พิมพ์คำสั่งต่อไปนี้ -

bin/kafka-server-start.sh config/server.properties

หลังจากเริ่ม Kafka Broker พิมพ์คำสั่งjpsบนเทอร์มินัล ZooKeeper และคุณจะเห็นคำตอบต่อไปนี้ -

821 QuorumPeerMain
928 Kafka
931 Jps

ตอนนี้คุณสามารถเห็นภูตสองตัวที่ทำงานบนเทอร์มินัลโดยที่ QuorumPeerMain คือ ZooKeeper daemon และอีกตัวคือ Kafka daemon

การกำหนดค่าโบรกเกอร์โหนดเดียว - เดี่ยว

ในการกำหนดค่านี้คุณมีอินสแตนซ์ ZooKeeper และรหัสนายหน้าเพียงชุดเดียว ต่อไปนี้เป็นขั้นตอนในการกำหนดค่า -

Creating a Kafka Topic- Kafka มียูทิลิตี้บรรทัดคำสั่งชื่อkafka-topics.shเพื่อสร้างหัวข้อบนเซิร์ฟเวอร์ เปิดเทอร์มินัลใหม่และพิมพ์ตัวอย่างด้านล่าง

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

เราเพิ่งสร้างหัวข้อชื่อHello-Kafkaด้วยพาร์ติชันเดียวและตัวประกอบการจำลองหนึ่งตัว ผลลัพธ์ที่สร้างขึ้นข้างต้นจะคล้ายกับผลลัพธ์ต่อไปนี้ -

Output- สร้างหัวข้อHello-Kafka

เมื่อสร้างหัวข้อแล้วคุณจะได้รับการแจ้งเตือนในหน้าต่างเทอร์มินัลโบรกเกอร์ Kafka และบันทึกสำหรับหัวข้อที่สร้างขึ้นซึ่งระบุไว้ใน“ / tmp / kafka-logs /“ ในไฟล์ config / server.properties

รายการหัวข้อ

ในการรับรายการหัวข้อในเซิร์ฟเวอร์ Kafka คุณสามารถใช้คำสั่งต่อไปนี้ -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

เนื่องจากเราได้สร้างหัวข้อจึงจะแสดงรายการHello-Kafkaเท่านั้น สมมติว่าหากคุณสร้างมากกว่าหนึ่งหัวข้อคุณจะได้รับชื่อหัวข้อในผลลัพธ์

เริ่ม Producer เพื่อส่งข้อความ

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

จากไวยากรณ์ข้างต้นจำเป็นต้องมีพารามิเตอร์หลักสองตัวสำหรับไคลเอนต์บรรทัดคำสั่งผู้ผลิต -

Broker-list- รายชื่อโบรกเกอร์ที่เราต้องการส่งข้อความไป ในกรณีนี้เรามีนายหน้าเพียงรายเดียว ไฟล์ Config / server.properties มี id พอร์ตโบรกเกอร์เนื่องจากเราทราบว่าโบรกเกอร์ของเรากำลังฟังพอร์ต 9092 ดังนั้นคุณจึงสามารถระบุได้โดยตรง

ชื่อหัวข้อ - นี่คือตัวอย่างสำหรับชื่อหัวข้อ

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

โปรดิวเซอร์จะรออินพุตจาก stdin และเผยแพร่ไปยังคลัสเตอร์ Kafka โดยดีฟอลต์ทุกบรรทัดใหม่จะถูกเผยแพร่เป็นข้อความใหม่จากนั้นคุณสมบัติผู้ผลิตดีฟอลต์ถูกระบุในไฟล์config / producer.properties ตอนนี้คุณสามารถพิมพ์ข้อความสองสามบรรทัดในเทอร์มินัลดังที่แสดงด้านล่าง

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

เริ่มให้ผู้บริโภครับข้อความ

คล้ายกับการผลิตคุณสมบัติของผู้บริโภคเริ่มต้นที่ระบุไว้ในconfig / consumer.proper สัมพันธ์ไฟล์ เปิดเทอร์มินัลใหม่และพิมพ์ไวยากรณ์ด้านล่างเพื่อใช้ข้อความ

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

สุดท้ายคุณสามารถป้อนข้อความจากเทอร์มินัลของผู้ผลิตและเห็นข้อความเหล่านั้นปรากฏในเทอร์มินัลของผู้บริโภค ณ ตอนนี้คุณมีความเข้าใจเป็นอย่างดีเกี่ยวกับคลัสเตอร์โหนดเดียวกับโบรกเกอร์เดียว ตอนนี้ให้เราไปที่การกำหนดค่าโบรกเกอร์หลายรายการ

การกำหนดค่าโบรกเกอร์หลายโหนดเดียว

ก่อนที่จะย้ายไปยังการตั้งค่าคลัสเตอร์โบรกเกอร์หลาย ๆ รายให้เริ่มเซิร์ฟเวอร์ ZooKeeper ของคุณก่อน

Create Multiple Kafka Brokers- เรามีอินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการอยู่แล้วใน con-fig / server.properties ตอนนี้เราต้องการอินสแตนซ์โบรกเกอร์หลายอินสแตนซ์ดังนั้นให้คัดลอกไฟล์ server.prop-erties ที่มีอยู่ลงในไฟล์ config ใหม่สองไฟล์แล้วเปลี่ยนชื่อเป็น server-one.properties และ server-two.prop-erties จากนั้นแก้ไขไฟล์ใหม่ทั้งสองไฟล์และกำหนดการเปลี่ยนแปลงต่อไปนี้ -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- หลังจากทำการเปลี่ยนแปลงทั้งหมดบนเซิร์ฟเวอร์สามเครื่องแล้วให้เปิดเทอร์มินัลใหม่สามเทอร์มินัลเพื่อเริ่มนายหน้าทีละคน

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

ตอนนี้เรามีโบรกเกอร์สามแห่งที่ทำงานบนเครื่อง ลองด้วยตัวเองเพื่อตรวจสอบภูตทั้งหมดโดยพิมพ์jps บนเทอร์มินัล ZooKeeper จากนั้นคุณจะเห็นการตอบสนอง

การสร้างหัวข้อ

ให้เรากำหนดค่าปัจจัยการจำลองเป็นสามสำหรับหัวข้อนี้เนื่องจากเรามีโบรกเกอร์สามแห่งที่ทำงานอยู่ หากคุณมีโบรกเกอร์สองรายค่าแบบจำลองที่กำหนดจะเป็นสอง

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

อธิบายคำสั่งที่ใช้ในการตรวจสอบซึ่งนายหน้าจะฟังในหัวข้อสร้างขึ้นในปัจจุบันที่แสดงด้านล่าง -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

จากผลลัพธ์ข้างต้นเราสามารถสรุปได้ว่าบรรทัดแรกให้ข้อมูลสรุปของพาร์ติชันทั้งหมดแสดงชื่อหัวข้อจำนวนพาร์ติชันและปัจจัยการจำลองแบบที่เราได้เลือกไว้แล้ว ในบรรทัดที่สองแต่ละโหนดจะเป็นผู้นำสำหรับส่วนที่สุ่มเลือกของพาร์ติชัน

ในกรณีของเราเราเห็นว่าโบรกเกอร์รายแรกของเรา (ที่มีโบรกเกอร์ id 0) เป็นผู้นำ จากนั้น Replicas: 0,2,1 หมายความว่าโบรกเกอร์ทั้งหมดทำซ้ำหัวข้อในที่สุดIsrก็คือชุดของแบบจำลองในการซิงค์ นี่คือส่วนย่อยของแบบจำลองที่มีชีวิตอยู่ในปัจจุบันและถูกจับโดยผู้นำ

เริ่ม Producer เพื่อส่งข้อความ

ขั้นตอนนี้ยังคงเหมือนกับในการตั้งค่านายหน้าเดี่ยว

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

เริ่มให้ผู้บริโภครับข้อความ

ขั้นตอนนี้ยังคงเหมือนเดิมตามที่แสดงในการตั้งค่านายหน้าเดี่ยว

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

การใช้งานหัวข้อพื้นฐาน

ในบทนี้เราจะพูดถึงการดำเนินงานของหัวข้อพื้นฐานต่างๆ

การแก้ไขหัวข้อ

ดังที่คุณได้เข้าใจวิธีสร้างหัวข้อใน Kafka Cluster แล้ว ตอนนี้ให้เราแก้ไขหัวข้อที่สร้างขึ้นโดยใช้คำสั่งต่อไปนี้

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

การลบหัวข้อ

ในการลบหัวข้อคุณสามารถใช้ไวยากรณ์ต่อไปนี้

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −สิ่งนี้จะไม่มีผลกระทบหาก delete.topic.enable ไม่ได้ตั้งค่าเป็นจริง