Apache Kafka - การทำงานขั้นพื้นฐาน
ขั้นแรกให้เราเริ่มใช้การกำหนดค่าโบรกเกอร์โหนดเดียว
จากนั้นเราจะย้ายการตั้งค่าของเราไปยังการกำหนดค่าโบรกเกอร์โหนดเดียวหลายรายการ
หวังว่าคุณจะติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณในตอนนี้ ก่อนที่จะย้ายไปที่ Kafka Cluster Setup ก่อนอื่นคุณต้องเริ่ม ZooKeeper ของคุณเนื่องจาก Kafka Cluster ใช้ ZooKeeper
เริ่ม ZooKeeper
เปิดเทอร์มินัลใหม่และพิมพ์คำสั่งต่อไปนี้ -
bin/zookeeper-server-start.sh config/zookeeper.properties
ในการเริ่ม Kafka Broker ให้พิมพ์คำสั่งต่อไปนี้ -
bin/kafka-server-start.sh config/server.properties
หลังจากเริ่ม Kafka Broker พิมพ์คำสั่งjps
บนเทอร์มินัล ZooKeeper และคุณจะเห็นคำตอบต่อไปนี้ -
821 QuorumPeerMain
928 Kafka
931 Jps
ตอนนี้คุณสามารถเห็นภูตสองตัวที่ทำงานบนเทอร์มินัลโดยที่ QuorumPeerMain คือ ZooKeeper daemon และอีกตัวคือ Kafka daemon
การกำหนดค่าโบรกเกอร์โหนดเดียว - เดี่ยว
ในการกำหนดค่านี้คุณมีอินสแตนซ์ ZooKeeper และรหัสนายหน้าเพียงชุดเดียว ต่อไปนี้เป็นขั้นตอนในการกำหนดค่า -
Creating a Kafka Topic- Kafka มียูทิลิตี้บรรทัดคำสั่งชื่อkafka-topics.sh
เพื่อสร้างหัวข้อบนเซิร์ฟเวอร์ เปิดเทอร์มินัลใหม่และพิมพ์ตัวอย่างด้านล่าง
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
เราเพิ่งสร้างหัวข้อชื่อHello-Kafka
ด้วยพาร์ติชันเดียวและตัวประกอบการจำลองหนึ่งตัว ผลลัพธ์ที่สร้างขึ้นข้างต้นจะคล้ายกับผลลัพธ์ต่อไปนี้ -
Output- สร้างหัวข้อHello-Kafka
เมื่อสร้างหัวข้อแล้วคุณจะได้รับการแจ้งเตือนในหน้าต่างเทอร์มินัลโบรกเกอร์ Kafka และบันทึกสำหรับหัวข้อที่สร้างขึ้นซึ่งระบุไว้ใน“ / tmp / kafka-logs /“ ในไฟล์ config / server.properties
รายการหัวข้อ
ในการรับรายการหัวข้อในเซิร์ฟเวอร์ Kafka คุณสามารถใช้คำสั่งต่อไปนี้ -
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
เนื่องจากเราได้สร้างหัวข้อจึงจะแสดงรายการHello-Kafka
เท่านั้น สมมติว่าหากคุณสร้างมากกว่าหนึ่งหัวข้อคุณจะได้รับชื่อหัวข้อในผลลัพธ์
เริ่ม Producer เพื่อส่งข้อความ
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
จากไวยากรณ์ข้างต้นจำเป็นต้องมีพารามิเตอร์หลักสองตัวสำหรับไคลเอนต์บรรทัดคำสั่งผู้ผลิต -
Broker-list- รายชื่อโบรกเกอร์ที่เราต้องการส่งข้อความไป ในกรณีนี้เรามีนายหน้าเพียงรายเดียว ไฟล์ Config / server.properties มี id พอร์ตโบรกเกอร์เนื่องจากเราทราบว่าโบรกเกอร์ของเรากำลังฟังพอร์ต 9092 ดังนั้นคุณจึงสามารถระบุได้โดยตรง
ชื่อหัวข้อ - นี่คือตัวอย่างสำหรับชื่อหัวข้อ
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
โปรดิวเซอร์จะรออินพุตจาก stdin และเผยแพร่ไปยังคลัสเตอร์ Kafka โดยดีฟอลต์ทุกบรรทัดใหม่จะถูกเผยแพร่เป็นข้อความใหม่จากนั้นคุณสมบัติผู้ผลิตดีฟอลต์ถูกระบุในไฟล์config / producer.properties
ตอนนี้คุณสามารถพิมพ์ข้อความสองสามบรรทัดในเทอร์มินัลดังที่แสดงด้านล่าง
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
เริ่มให้ผู้บริโภครับข้อความ
คล้ายกับการผลิตคุณสมบัติของผู้บริโภคเริ่มต้นที่ระบุไว้ในconfig / consumer.proper สัมพันธ์
ไฟล์ เปิดเทอร์มินัลใหม่และพิมพ์ไวยากรณ์ด้านล่างเพื่อใช้ข้อความ
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
สุดท้ายคุณสามารถป้อนข้อความจากเทอร์มินัลของผู้ผลิตและเห็นข้อความเหล่านั้นปรากฏในเทอร์มินัลของผู้บริโภค ณ ตอนนี้คุณมีความเข้าใจเป็นอย่างดีเกี่ยวกับคลัสเตอร์โหนดเดียวกับโบรกเกอร์เดียว ตอนนี้ให้เราไปที่การกำหนดค่าโบรกเกอร์หลายรายการ
การกำหนดค่าโบรกเกอร์หลายโหนดเดียว
ก่อนที่จะย้ายไปยังการตั้งค่าคลัสเตอร์โบรกเกอร์หลาย ๆ รายให้เริ่มเซิร์ฟเวอร์ ZooKeeper ของคุณก่อน
Create Multiple Kafka Brokers- เรามีอินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการอยู่แล้วใน con-fig / server.properties ตอนนี้เราต้องการอินสแตนซ์โบรกเกอร์หลายอินสแตนซ์ดังนั้นให้คัดลอกไฟล์ server.prop-erties ที่มีอยู่ลงในไฟล์ config ใหม่สองไฟล์แล้วเปลี่ยนชื่อเป็น server-one.properties และ server-two.prop-erties จากนั้นแก้ไขไฟล์ใหม่ทั้งสองไฟล์และกำหนดการเปลี่ยนแปลงต่อไปนี้ -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers- หลังจากทำการเปลี่ยนแปลงทั้งหมดบนเซิร์ฟเวอร์สามเครื่องแล้วให้เปิดเทอร์มินัลใหม่สามเทอร์มินัลเพื่อเริ่มนายหน้าทีละคน
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
ตอนนี้เรามีโบรกเกอร์สามแห่งที่ทำงานบนเครื่อง ลองด้วยตัวเองเพื่อตรวจสอบภูตทั้งหมดโดยพิมพ์jps บนเทอร์มินัล ZooKeeper จากนั้นคุณจะเห็นการตอบสนอง
การสร้างหัวข้อ
ให้เรากำหนดค่าปัจจัยการจำลองเป็นสามสำหรับหัวข้อนี้เนื่องจากเรามีโบรกเกอร์สามแห่งที่ทำงานอยู่ หากคุณมีโบรกเกอร์สองรายค่าแบบจำลองที่กำหนดจะเป็นสอง
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
อธิบาย
คำสั่งที่ใช้ในการตรวจสอบซึ่งนายหน้าจะฟังในหัวข้อสร้างขึ้นในปัจจุบันที่แสดงด้านล่าง -
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
จากผลลัพธ์ข้างต้นเราสามารถสรุปได้ว่าบรรทัดแรกให้ข้อมูลสรุปของพาร์ติชันทั้งหมดแสดงชื่อหัวข้อจำนวนพาร์ติชันและปัจจัยการจำลองแบบที่เราได้เลือกไว้แล้ว ในบรรทัดที่สองแต่ละโหนดจะเป็นผู้นำสำหรับส่วนที่สุ่มเลือกของพาร์ติชัน
ในกรณีของเราเราเห็นว่าโบรกเกอร์รายแรกของเรา (ที่มีโบรกเกอร์ id 0) เป็นผู้นำ จากนั้น Replicas: 0,2,1 หมายความว่าโบรกเกอร์ทั้งหมดทำซ้ำหัวข้อในที่สุดIsr
ก็คือชุดของแบบจำลองในการซิงค์
นี่คือส่วนย่อยของแบบจำลองที่มีชีวิตอยู่ในปัจจุบันและถูกจับโดยผู้นำ
เริ่ม Producer เพื่อส่งข้อความ
ขั้นตอนนี้ยังคงเหมือนกับในการตั้งค่านายหน้าเดี่ยว
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
เริ่มให้ผู้บริโภครับข้อความ
ขั้นตอนนี้ยังคงเหมือนเดิมตามที่แสดงในการตั้งค่านายหน้าเดี่ยว
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
การใช้งานหัวข้อพื้นฐาน
ในบทนี้เราจะพูดถึงการดำเนินงานของหัวข้อพื้นฐานต่างๆ
การแก้ไขหัวข้อ
ดังที่คุณได้เข้าใจวิธีสร้างหัวข้อใน Kafka Cluster แล้ว ตอนนี้ให้เราแก้ไขหัวข้อที่สร้างขึ้นโดยใช้คำสั่งต่อไปนี้
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
การลบหัวข้อ
ในการลบหัวข้อคุณสามารถใช้ไวยากรณ์ต่อไปนี้
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −สิ่งนี้จะไม่มีผลกระทบหาก delete.topic.enable ไม่ได้ตั้งค่าเป็นจริง