Apache Kafka - การทำงานขั้นพื้นฐาน
ขั้นแรกให้เราเริ่มใช้การกำหนดค่าโบรกเกอร์โหนดเดียว
จากนั้นเราจะย้ายการตั้งค่าของเราไปยังการกำหนดค่าโบรกเกอร์โหนดเดียวหลายรายการ
หวังว่าคุณจะติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณในตอนนี้ ก่อนที่จะย้ายไปที่ Kafka Cluster Setup ก่อนอื่นคุณต้องเริ่ม ZooKeeper ของคุณเนื่องจาก Kafka Cluster ใช้ ZooKeeper
เริ่ม ZooKeeper
เปิดเทอร์มินัลใหม่และพิมพ์คำสั่งต่อไปนี้ -
bin/zookeeper-server-start.sh config/zookeeper.propertiesในการเริ่ม Kafka Broker ให้พิมพ์คำสั่งต่อไปนี้ -
bin/kafka-server-start.sh config/server.propertiesหลังจากเริ่ม Kafka Broker พิมพ์คำสั่งjps
บนเทอร์มินัล ZooKeeper และคุณจะเห็นคำตอบต่อไปนี้ -
821 QuorumPeerMain
928 Kafka
931 Jpsตอนนี้คุณสามารถเห็นภูตสองตัวที่ทำงานบนเทอร์มินัลโดยที่ QuorumPeerMain คือ ZooKeeper daemon และอีกตัวคือ Kafka daemon
การกำหนดค่าโบรกเกอร์โหนดเดียว - เดี่ยว
ในการกำหนดค่านี้คุณมีอินสแตนซ์ ZooKeeper และรหัสนายหน้าเพียงชุดเดียว ต่อไปนี้เป็นขั้นตอนในการกำหนดค่า -
Creating a Kafka Topic- Kafka มียูทิลิตี้บรรทัดคำสั่งชื่อkafka-topics.sh
เพื่อสร้างหัวข้อบนเซิร์ฟเวอร์ เปิดเทอร์มินัลใหม่และพิมพ์ตัวอย่างด้านล่าง
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-nameExample
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafkaเราเพิ่งสร้างหัวข้อชื่อHello-Kafka
ด้วยพาร์ติชันเดียวและตัวประกอบการจำลองหนึ่งตัว ผลลัพธ์ที่สร้างขึ้นข้างต้นจะคล้ายกับผลลัพธ์ต่อไปนี้ -
Output- สร้างหัวข้อHello-Kafka
เมื่อสร้างหัวข้อแล้วคุณจะได้รับการแจ้งเตือนในหน้าต่างเทอร์มินัลโบรกเกอร์ Kafka และบันทึกสำหรับหัวข้อที่สร้างขึ้นซึ่งระบุไว้ใน“ / tmp / kafka-logs /“ ในไฟล์ config / server.properties
รายการหัวข้อ
ในการรับรายการหัวข้อในเซิร์ฟเวอร์ Kafka คุณสามารถใช้คำสั่งต่อไปนี้ -
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181Output
Hello-Kafkaเนื่องจากเราได้สร้างหัวข้อจึงจะแสดงรายการHello-Kafka
เท่านั้น สมมติว่าหากคุณสร้างมากกว่าหนึ่งหัวข้อคุณจะได้รับชื่อหัวข้อในผลลัพธ์
เริ่ม Producer เพื่อส่งข้อความ
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-nameจากไวยากรณ์ข้างต้นจำเป็นต้องมีพารามิเตอร์หลักสองตัวสำหรับไคลเอนต์บรรทัดคำสั่งผู้ผลิต -
Broker-list- รายชื่อโบรกเกอร์ที่เราต้องการส่งข้อความไป ในกรณีนี้เรามีนายหน้าเพียงรายเดียว ไฟล์ Config / server.properties มี id พอร์ตโบรกเกอร์เนื่องจากเราทราบว่าโบรกเกอร์ของเรากำลังฟังพอร์ต 9092 ดังนั้นคุณจึงสามารถระบุได้โดยตรง
ชื่อหัวข้อ - นี่คือตัวอย่างสำหรับชื่อหัวข้อ
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafkaโปรดิวเซอร์จะรออินพุตจาก stdin และเผยแพร่ไปยังคลัสเตอร์ Kafka โดยดีฟอลต์ทุกบรรทัดใหม่จะถูกเผยแพร่เป็นข้อความใหม่จากนั้นคุณสมบัติผู้ผลิตดีฟอลต์ถูกระบุในไฟล์config / producer.properties 
ตอนนี้คุณสามารถพิมพ์ข้อความสองสามบรรทัดในเทอร์มินัลดังที่แสดงด้านล่าง
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first messageMy second messageเริ่มให้ผู้บริโภครับข้อความ
คล้ายกับการผลิตคุณสมบัติของผู้บริโภคเริ่มต้นที่ระบุไว้ในconfig / consumer.proper สัมพันธ์
ไฟล์ เปิดเทอร์มินัลใหม่และพิมพ์ไวยากรณ์ด้านล่างเพื่อใช้ข้อความ
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginningExample
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginningOutput
Hello
My first message
My second messageสุดท้ายคุณสามารถป้อนข้อความจากเทอร์มินัลของผู้ผลิตและเห็นข้อความเหล่านั้นปรากฏในเทอร์มินัลของผู้บริโภค ณ ตอนนี้คุณมีความเข้าใจเป็นอย่างดีเกี่ยวกับคลัสเตอร์โหนดเดียวกับโบรกเกอร์เดียว ตอนนี้ให้เราไปที่การกำหนดค่าโบรกเกอร์หลายรายการ
การกำหนดค่าโบรกเกอร์หลายโหนดเดียว
ก่อนที่จะย้ายไปยังการตั้งค่าคลัสเตอร์โบรกเกอร์หลาย ๆ รายให้เริ่มเซิร์ฟเวอร์ ZooKeeper ของคุณก่อน
Create Multiple Kafka Brokers- เรามีอินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการอยู่แล้วใน con-fig / server.properties ตอนนี้เราต้องการอินสแตนซ์โบรกเกอร์หลายอินสแตนซ์ดังนั้นให้คัดลอกไฟล์ server.prop-erties ที่มีอยู่ลงในไฟล์ config ใหม่สองไฟล์แล้วเปลี่ยนชื่อเป็น server-one.properties และ server-two.prop-erties จากนั้นแก้ไขไฟล์ใหม่ทั้งสองไฟล์และกำหนดการเปลี่ยนแปลงต่อไปนี้ -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2Start Multiple Brokers- หลังจากทำการเปลี่ยนแปลงทั้งหมดบนเซิร์ฟเวอร์สามเครื่องแล้วให้เปิดเทอร์มินัลใหม่สามเทอร์มินัลเพื่อเริ่มนายหน้าทีละคน
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.propertiesตอนนี้เรามีโบรกเกอร์สามแห่งที่ทำงานบนเครื่อง ลองด้วยตัวเองเพื่อตรวจสอบภูตทั้งหมดโดยพิมพ์jps บนเทอร์มินัล ZooKeeper จากนั้นคุณจะเห็นการตอบสนอง
การสร้างหัวข้อ
ให้เรากำหนดค่าปัจจัยการจำลองเป็นสามสำหรับหัวข้อนี้เนื่องจากเรามีโบรกเกอร์สามแห่งที่ทำงานอยู่ หากคุณมีโบรกเกอร์สองรายค่าแบบจำลองที่กำหนดจะเป็นสอง
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-nameExample
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic MultibrokerapplicationOutput
created topic “Multibrokerapplication”อธิบาย
คำสั่งที่ใช้ในการตรวจสอบซึ่งนายหน้าจะฟังในหัวข้อสร้างขึ้นในปัจจุบันที่แสดงด้านล่าง -
bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cationOutput
bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation
Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1จากผลลัพธ์ข้างต้นเราสามารถสรุปได้ว่าบรรทัดแรกให้ข้อมูลสรุปของพาร์ติชันทั้งหมดแสดงชื่อหัวข้อจำนวนพาร์ติชันและปัจจัยการจำลองแบบที่เราได้เลือกไว้แล้ว ในบรรทัดที่สองแต่ละโหนดจะเป็นผู้นำสำหรับส่วนที่สุ่มเลือกของพาร์ติชัน
ในกรณีของเราเราเห็นว่าโบรกเกอร์รายแรกของเรา (ที่มีโบรกเกอร์ id 0) เป็นผู้นำ จากนั้น Replicas: 0,2,1 หมายความว่าโบรกเกอร์ทั้งหมดทำซ้ำหัวข้อในที่สุดIsr
ก็คือชุดของแบบจำลองในการซิงค์ 
นี่คือส่วนย่อยของแบบจำลองที่มีชีวิตอยู่ในปัจจุบันและถูกจับโดยผู้นำ
เริ่ม Producer เพื่อส่งข้อความ
ขั้นตอนนี้ยังคงเหมือนกับในการตั้งค่านายหน้าเดี่ยว
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic MultibrokerapplicationOutput
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second messageเริ่มให้ผู้บริโภครับข้อความ
ขั้นตอนนี้ยังคงเหมือนเดิมตามที่แสดงในการตั้งค่านายหน้าเดี่ยว
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginningOutput
bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second messageการใช้งานหัวข้อพื้นฐาน
ในบทนี้เราจะพูดถึงการดำเนินงานของหัวข้อพื้นฐานต่างๆ
การแก้ไขหัวข้อ
ดังที่คุณได้เข้าใจวิธีสร้างหัวข้อใน Kafka Cluster แล้ว ตอนนี้ให้เราแก้ไขหัวข้อที่สร้างขึ้นโดยใช้คำสั่งต่อไปนี้
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions countExample
We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2Output
WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!การลบหัวข้อ
ในการลบหัวข้อคุณสามารถใช้ไวยากรณ์ต่อไปนี้
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_nameExample
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafkaOutput
> Topic Hello-kafka marked for deletionNote −สิ่งนี้จะไม่มีผลกระทบหาก delete.topic.enable ไม่ได้ตั้งค่าเป็นจริง