Apache Kafka - คู่มือฉบับย่อ

ใน Big Data มีการใช้ข้อมูลจำนวนมหาศาล เกี่ยวกับข้อมูลเรามีความท้าทายหลักสองประการความท้าทายแรกคือวิธีรวบรวมข้อมูลจำนวนมากและความท้าทายประการที่สองคือการวิเคราะห์ข้อมูลที่รวบรวม เพื่อเอาชนะความท้าทายเหล่านั้นคุณต้องมีระบบส่งข้อความ

Kafka ออกแบบมาสำหรับระบบปริมาณงานสูงแบบกระจาย Kafka มีแนวโน้มที่จะทำงานได้ดีมากในฐานะตัวแทนนายหน้าข้อความแบบเดิม ๆ เมื่อเปรียบเทียบกับระบบการส่งข้อความอื่น Kafka มีทรูพุตที่ดีกว่าการแบ่งพาร์ติชันในตัวการจำลองแบบและการทนต่อความผิดพลาดโดยธรรมชาติซึ่งทำให้เหมาะสำหรับแอปพลิเคชันการประมวลผลข้อความขนาดใหญ่

ระบบส่งข้อความคืออะไร?

ระบบส่งข้อความมีหน้าที่ในการถ่ายโอนข้อมูลจากแอปพลิเคชันหนึ่งไปยังอีกแอปพลิเคชันดังนั้นแอปพลิเคชันจึงสามารถมุ่งเน้นไปที่ข้อมูลได้ แต่ไม่ต้องกังวลเกี่ยวกับวิธีการแบ่งปัน การส่งข้อความแบบกระจายจะขึ้นอยู่กับแนวคิดของการจัดคิวข้อความที่เชื่อถือได้ ข้อความถูกจัดคิวแบบอะซิงโครนัสระหว่างแอปพลิเคชันไคลเอ็นต์และระบบการส่งข้อความ รูปแบบการส่งข้อความมีให้เลือกสองแบบคือแบบชี้ต่อจุดและอีกแบบคือระบบการส่งข้อความเผยแพร่สมัครสมาชิก (pub-sub) รูปแบบการส่งข้อความส่วนใหญ่เป็นไปตามpub-sub.

ชี้ไปที่ระบบส่งข้อความจุด

ในระบบจุดต่อจุดข้อความจะยังคงอยู่ในคิว ผู้บริโภคอย่างน้อยหนึ่งรายสามารถใช้ข้อความในคิวได้ แต่ข้อความหนึ่ง ๆ สามารถใช้งานได้โดยผู้บริโภคสูงสุดหนึ่งรายเท่านั้น เมื่อผู้บริโภคอ่านข้อความในคิวข้อความนั้นจะหายไปจากคิวนั้น ตัวอย่างทั่วไปของระบบนี้คือระบบประมวลผลคำสั่งซึ่งแต่ละคำสั่งจะได้รับการประมวลผลโดยตัวประมวลผลคำสั่งเดียว แต่โปรเซสเซอร์หลายคำสั่งสามารถทำงานได้เช่นกันในเวลาเดียวกัน แผนภาพต่อไปนี้แสดงถึงโครงสร้าง

เผยแพร่ - สมัครรับข้อความระบบ

ในระบบเผยแพร่ - สมัครสมาชิกข้อความจะยังคงอยู่ในหัวข้อ แตกต่างจากระบบจุดต่อจุดผู้บริโภคสามารถสมัครสมาชิกหัวข้อหนึ่งหรือหลายหัวข้อและใช้ข้อความทั้งหมดในหัวข้อนั้น ในระบบการสมัครสมาชิกเผยแพร่ผู้ผลิตข้อความเรียกว่าผู้เผยแพร่และผู้บริโภคข้อความเรียกว่าสมาชิก ตัวอย่างในชีวิตจริงคือ Dish TV ซึ่งเผยแพร่ช่องต่างๆเช่นกีฬาภาพยนตร์เพลง ฯลฯ และทุกคนสามารถสมัครรับข้อมูลจากชุดช่องของตนเองและรับเมื่อใดก็ตามที่มีช่องที่สมัครรับข้อมูล

คาฟคาคืออะไร?

Apache Kafka เป็นระบบส่งข้อความแบบเผยแพร่ - สมัครสมาชิกแบบกระจายและคิวที่มีประสิทธิภาพซึ่งสามารถจัดการข้อมูลจำนวนมากและช่วยให้คุณสามารถส่งผ่านข้อความจากปลายทางหนึ่งไปยังอีกจุดหนึ่งได้ คาฟคาเหมาะสำหรับการใช้ข้อความทั้งออฟไลน์และออนไลน์ ข้อความ Kafka จะยังคงอยู่บนดิสก์และจำลองแบบภายในคลัสเตอร์เพื่อป้องกันข้อมูลสูญหาย Kafka สร้างขึ้นจากบริการซิงโครไนซ์ ZooKeeper ทำงานร่วมกับ Apache Storm และ Spark ได้เป็นอย่างดีสำหรับการวิเคราะห์ข้อมูลสตรีมมิ่งแบบเรียลไทม์

สิทธิประโยชน์

ต่อไปนี้เป็นประโยชน์บางประการของ Kafka -

  • Reliability - Kafka มีการแจกจ่ายแบ่งพาร์ติชันจำลองแบบและความทนทานต่อความผิดพลาด

  • Scalability - ระบบส่งข้อความ Kafka ปรับขนาดได้อย่างง่ายดายโดยไม่ต้องเสียเวลา ..

  • Durability- Kafka ใช้บันทึกการกระทำแบบกระจายซึ่งหมายความว่าข้อความยังคงอยู่บนดิสก์โดยเร็วที่สุดจึงมีความทนทาน ..

  • Performance- Kafka มีปริมาณงานสูงสำหรับทั้งการเผยแพร่และการสมัครรับข้อความ รักษาประสิทธิภาพการทำงานที่เสถียรแม้จะเก็บข้อความไว้หลาย TB

Kafka รวดเร็วมากและรับประกันการหยุดทำงานเป็นศูนย์และข้อมูลสูญหายเป็นศูนย์

ใช้กรณี

Kafka สามารถใช้กับ Use Case ได้หลายแบบ บางส่วนมีการระบุไว้ด้านล่าง -

  • Metrics- คาฟคามักใช้สำหรับข้อมูลการตรวจสอบการปฏิบัติงาน สิ่งนี้เกี่ยวข้องกับการรวบรวมสถิติจากแอปพลิเคชันแบบกระจายเพื่อสร้างฟีดข้อมูลการดำเนินงานจากส่วนกลาง

  • Log Aggregation Solution - Kafka สามารถใช้งานได้ทั่วทั้งองค์กรเพื่อรวบรวมบันทึกจากบริการที่หลากหลายและทำให้สามารถใช้งานได้ในรูปแบบมาตรฐานสำหรับผู้ใช้หลายคน

  • Stream Processing- เฟรมเวิร์กยอดนิยมเช่น Storm และ Spark Streaming อ่านข้อมูลจากหัวข้อประมวลผลและเขียนข้อมูลที่ประมวลผลไปยังหัวข้อใหม่ซึ่งพร้อมใช้งานสำหรับผู้ใช้และแอปพลิเคชัน ความทนทานที่แข็งแกร่งของ Kafka ยังมีประโยชน์อย่างมากในบริบทของการประมวลผลสตรีม

ต้องการคาฟคา

Kafka เป็นแพลตฟอร์มแบบครบวงจรสำหรับจัดการฟีดข้อมูลแบบเรียลไทม์ทั้งหมด Kafka รองรับการส่งข้อความเวลาแฝงต่ำและรับประกันความทนทานต่อความผิดพลาดในกรณีที่เครื่องขัดข้อง มีความสามารถในการรองรับผู้บริโภคที่หลากหลายจำนวนมาก Kafka เร็วมากเขียน 2 ล้านครั้ง / วินาที Kafka ยังคงเก็บข้อมูลทั้งหมดไว้ในดิสก์ซึ่งโดยพื้นฐานแล้วหมายความว่าสิ่งที่เขียนทั้งหมดไปที่แคชของหน้าของ OS (RAM) ทำให้การถ่ายโอนข้อมูลจากแคชของเพจไปยังซ็อกเก็ตเครือข่ายมีประสิทธิภาพมาก

ก่อนที่จะก้าวเข้าสู่คาฟคาคุณต้องตระหนักถึงคำศัพท์หลัก ๆ เช่นหัวข้อนายหน้าผู้ผลิตและผู้บริโภค แผนภาพต่อไปนี้แสดงคำศัพท์หลักและตารางอธิบายส่วนประกอบของแผนภาพโดยละเอียด

ในแผนภาพด้านบนหัวข้อถูกกำหนดค่าเป็นสามพาร์ติชัน พาร์ติชั่น 1 มีปัจจัยออฟเซ็ต 2 ตัว 0 และ 1 พาร์ติชั่น 2 มีสี่ออฟเซ็ตแฟคเตอร์ 0, 1, 2 และ 3 พาร์ติชั่น 3 มีอ็อฟเซ็ตแฟคเตอร์ 0 หนึ่งไอดีของแบบจำลองจะเหมือนกับ id ของเซิร์ฟเวอร์ที่โฮสต์มัน

สมมติว่าหากตั้งค่าปัจจัยการจำลองแบบของหัวข้อเป็น 3 Kafka จะสร้างแบบจำลองที่เหมือนกัน 3 รายการของแต่ละพาร์ติชันและวางไว้ในคลัสเตอร์เพื่อให้พร้อมใช้งานสำหรับการดำเนินการทั้งหมด ในการปรับสมดุลภาระในคลัสเตอร์แต่ละโบรกเกอร์จะจัดเก็บพาร์ติชันเหล่านั้นอย่างน้อยหนึ่งพาร์ติชัน ผู้ผลิตและผู้บริโภคหลายรายสามารถเผยแพร่และเรียกดูข้อความได้ในเวลาเดียวกัน

ส. เลขที่ ส่วนประกอบและคำอธิบาย
1

Topics

กระแสของข้อความที่อยู่ในหมวดหมู่หนึ่งเรียกว่าหัวข้อ ข้อมูลถูกจัดเก็บในหัวข้อ

หัวข้อจะแบ่งออกเป็นพาร์ติชัน สำหรับแต่ละหัวข้อ Kafka จะเก็บมินิมัมไว้หนึ่งพาร์ติชัน แต่ละพาร์ติชันดังกล่าวมีข้อความตามลำดับที่ไม่เปลี่ยนรูป พาร์ติชันถูกใช้งานเป็นชุดของไฟล์เซ็กเมนต์ที่มีขนาดเท่ากัน

2

Partition

หัวข้ออาจมีหลายพาร์ติชั่นดังนั้นจึงสามารถจัดการกับข้อมูลได้ตามอำเภอใจ

3

Partition offset

แต่ละข้อความแบ่งพาร์ติชันที่มี ID ลำดับไม่ซ้ำกันเรียกว่าเป็นชดเชย

4

Replicas of partition

Replicas ไม่ใช่อะไรนอกจากการสำรองข้อมูลของพาร์ติชัน ข้อมูลจำลองจะไม่อ่านหรือเขียนข้อมูล ใช้เพื่อป้องกันข้อมูลสูญหาย

5

Brokers

  • โบรกเกอร์เป็นระบบที่เรียบง่ายที่รับผิดชอบในการดูแลรักษาข้อมูลที่มีชื่อเสียง แต่ละโบรกเกอร์อาจมีพาร์ติชั่นเป็นศูนย์หรือมากกว่านั้นต่อหัวข้อ สมมติว่าหากมี N พาร์ติชั่นในหัวข้อและ N จำนวนโบรกเกอร์แต่ละโบรกเกอร์จะมีพาร์ติชั่นเดียว

  • สมมติว่ามีพาร์ติชั่น N ในหัวข้อและมากกว่า N โบรกเกอร์ (n + m) โบรกเกอร์ N แรกจะมีพาร์ติชันเดียวและโบรกเกอร์ M ถัดไปจะไม่มีพาร์ติชันสำหรับหัวข้อนั้น ๆ

  • สมมติว่ามี N พาร์ติชั่นในหัวข้อและน้อยกว่า N โบรกเกอร์ (นาโนเมตร) แต่ละโบรกเกอร์จะมีพาร์ติชันร่วมกันอย่างน้อยหนึ่งพาร์ติชั่น ไม่แนะนำให้ใช้สถานการณ์นี้เนื่องจากโบรกเกอร์โหลด Distri-bution ไม่เท่ากัน

6

Kafka Cluster

คาฟคามีนายหน้ามากกว่าหนึ่งรายเรียกว่าคลัสเตอร์คาฟคา คลัสเตอร์ Kafka สามารถขยายได้โดยไม่ต้องหยุดทำงาน คลัสเตอร์เหล่านี้ใช้เพื่อจัดการการคงอยู่และการจำลองข้อมูลข้อความ

7

Producers

ผู้ผลิตเป็นผู้เผยแพร่ข้อความไปยังหัวข้อ Kafka อย่างน้อยหนึ่งหัวข้อ ผู้ผลิตส่งข้อมูลไปยังโบรกเกอร์คาฟคา ทุกครั้งที่ผู้ผลิตเผยแพร่ข้อความถึงนายหน้านายหน้าจะต่อท้ายข้อความไว้ในไฟล์ส่วนสุดท้าย จริงๆแล้วข้อความจะถูกต่อท้ายพาร์ติชัน Producer ยังสามารถส่งข้อความไปยังพาร์ติชันที่ต้องการได้

8

Consumers

ผู้บริโภคอ่านข้อมูลจากโบรกเกอร์ ผู้บริโภคสมัครรับข่าวสารตั้งแต่หนึ่งหัวข้อขึ้นไปและใช้ข้อความที่เผยแพร่โดยดึงข้อมูลจากโบรกเกอร์

9

Leader

Leaderคือโหนดที่รับผิดชอบการอ่านและเขียนทั้งหมดสำหรับพาร์ติชันที่กำหนด ทุกพาร์ติชันมีเซิร์ฟเวอร์หนึ่งเครื่องทำหน้าที่เป็นผู้นำ

10

Follower

โหนดซึ่งเป็นไปตามคำแนะนำของผู้นำเรียกว่าเป็นผู้ตาม หากผู้นำล้มเหลวผู้ตามคนใดคนหนึ่งจะกลายเป็นผู้นำคนใหม่โดยอัตโนมัติ ผู้ติดตามทำหน้าที่เป็นผู้บริโภคทั่วไปดึงข้อความและอัปเดตที่เก็บข้อมูลของตนเอง

ดูภาพประกอบต่อไปนี้ แสดงแผนภาพคลัสเตอร์ของคาฟคา

ตารางต่อไปนี้อธิบายส่วนประกอบแต่ละอย่างที่แสดงในแผนภาพด้านบน

ส. เลขที่ ส่วนประกอบและคำอธิบาย
1

Broker

โดยทั่วไปคลัสเตอร์ Kafka ประกอบด้วยโบรกเกอร์หลายรายเพื่อรักษาสมดุลของภาระงาน โบรกเกอร์ Kafka ไร้สัญชาติดังนั้นพวกเขาจึงใช้ ZooKeeper เพื่อรักษาสถานะคลัสเตอร์ของตน อินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการสามารถรองรับการอ่านและเขียนได้หลายแสนครั้งต่อวินาทีและ bro-ker แต่ละรายสามารถจัดการข้อความ TB ได้โดยไม่ส่งผลกระทบต่อประสิทธิภาพ การเลือกตั้งหัวหน้านายหน้าคาฟคาสามารถทำได้โดย ZooKeeper

2

ZooKeeper

ZooKeeper ใช้สำหรับจัดการและประสานงานนายหน้าคาฟคา บริการ ZooKeeper ส่วนใหญ่จะใช้เพื่อแจ้งให้ผู้ผลิตและผู้บริโภคทราบเกี่ยวกับการมีนายหน้าใหม่ในระบบ Kafka หรือความล้มเหลวของนายหน้าในระบบ Kafka ตามการแจ้งเตือนที่ได้รับจาก Zookeeper เกี่ยวกับการมีอยู่หรือความล้มเหลวของนายหน้าจากนั้นโปรดูเซอร์และผู้บริโภคจะตัดสินใจและเริ่มประสานงานกับนายหน้ารายอื่น

3

Producers

ผู้ผลิตส่งข้อมูลไปยังโบรกเกอร์ เมื่อนายหน้าใหม่เริ่มต้นผู้ผลิตทั้งหมดจะค้นหาและส่งข้อความไปยังนายหน้าใหม่นั้นโดยอัตโนมัติ ผู้ผลิต Kafka ไม่รอการตอบรับจากนายหน้าและส่งข้อความให้เร็วที่สุดเท่าที่นายหน้าจะจัดการได้

4

Consumers

เนื่องจากโบรกเกอร์ Kafka เป็นคนไร้สัญชาติซึ่งหมายความว่าผู้บริโภคต้องรักษาจำนวนข้อความที่ใช้ไปโดยใช้พาร์ติชันออฟเซ็ต หากผู้บริโภครับทราบการชดเชยข้อความใดข้อความหนึ่งแสดงว่าผู้บริโภคใช้ข้อความก่อนหน้านี้หมดแล้ว ผู้บริโภคส่งคำขอดึงแบบอะซิงโครนัสไปยังโบรกเกอร์เพื่อให้มีบัฟเฟอร์ของไบต์ที่พร้อมใช้งาน ผู้บริโภคสามารถย้อนกลับหรือข้ามไปยังจุดใดก็ได้ในพาร์ติชันเพียงแค่ใส่ค่าชดเชย ค่าชดเชยของผู้บริโภคจะแจ้งโดย ZooKeeper

ณ ตอนนี้เราได้กล่าวถึงแนวคิดหลักของคาฟคา ตอนนี้ให้เราแสดงความคิดเห็นเกี่ยวกับขั้นตอนการทำงานของ Kafka

Kafka เป็นเพียงชุดของหัวข้อที่แบ่งออกเป็นพาร์ติชันหนึ่งหรือหลายพาร์ติชัน พาร์ติชัน Kafka คือลำดับของข้อความที่เรียงตามลำดับเชิงเส้นโดยแต่ละข้อความจะถูกระบุโดยดัชนีของพวกเขา (เรียกว่าออฟเซ็ต) ข้อมูลทั้งหมดในคลัสเตอร์ Kafka คือการรวมกันของพาร์ติชันที่ไม่ปะติดปะต่อกัน ข้อความขาเข้าจะถูกเขียนไว้ที่ส่วนท้ายของพาร์ติชันและข้อความจะถูกอ่านโดยผู้บริโภคตามลำดับ ความทนทานมีให้โดยการจำลองข้อความไปยังโบรกเกอร์ต่างๆ

Kafka ให้บริการทั้งระบบส่งข้อความ pub-sub และตามคิวในลักษณะที่รวดเร็วเชื่อถือได้คงอยู่ทนต่อความผิดพลาดและไม่มีเวลาหยุดทำงาน ในทั้งสองกรณีผู้ผลิตเพียงแค่ส่งข้อความไปยังหัวข้อและผู้บริโภคสามารถเลือกระบบการส่งข้อความประเภทใดก็ได้ขึ้นอยู่กับความต้องการของพวกเขา ให้เราทำตามขั้นตอนในหัวข้อถัดไปเพื่อทำความเข้าใจว่าผู้บริโภคสามารถเลือกระบบการส่งข้อความที่ต้องการได้อย่างไร

เวิร์กโฟลว์ของ Pub-Sub Messaging

ต่อไปนี้เป็นขั้นตอนการทำงานที่ชาญฉลาดของ Pub-Sub Messaging -

  • ผู้ผลิตส่งข้อความถึงหัวข้อในช่วงเวลาปกติ

  • โบรกเกอร์ Kafka เก็บข้อความทั้งหมดในพาร์ติชันที่กำหนดค่าไว้สำหรับหัวข้อนั้น ๆ เพื่อให้แน่ใจว่าข้อความจะถูกแบ่งใช้อย่างเท่าเทียมกันระหว่างพาร์ติชัน หากผู้ผลิตส่งข้อความสองข้อความและมีสองพาร์ติชัน Kafka จะเก็บหนึ่งข้อความในพาร์ติชันแรกและข้อความที่สองในพาร์ติชันที่สอง

  • ผู้บริโภคสมัครรับหัวข้อเฉพาะ

  • เมื่อผู้บริโภคสมัครรับหัวข้อแล้ว Kafka จะให้ค่าชดเชยปัจจุบันของหัวข้อแก่ผู้บริโภคและยังบันทึกค่าชดเชยในชุด Zookeeper

  • ผู้บริโภคจะร้องขอ Kafka ในช่วงเวลาปกติ (เช่น 100 Ms) สำหรับข้อความใหม่

  • เมื่อคาฟคาได้รับข้อความจากผู้ผลิตระบบจะส่งต่อข้อความเหล่านี้ไปยังผู้บริโภค

  • ผู้บริโภคจะได้รับข้อความและดำเนินการ

  • เมื่อข้อความได้รับการประมวลผลผู้บริโภคจะส่งการตอบรับไปยังนายหน้าคาฟคา

  • เมื่อ Kafka ได้รับการตอบรับแล้วจะเปลี่ยนค่าชดเชยเป็นค่าใหม่และอัปเดตใน Zookeeper เนื่องจากการชดเชยยังคงอยู่ใน Zookeeper ผู้บริโภคจึงสามารถอ่านข้อความถัดไปได้อย่างถูกต้องแม้ในช่วงที่เซิร์ฟเวอร์หยุดทำงาน

  • ขั้นตอนข้างต้นนี้จะทำซ้ำจนกว่าผู้บริโภคจะหยุดคำขอ

  • ผู้บริโภคมีตัวเลือกในการกรอกลับ / ข้ามไปยังออฟเซ็ตที่ต้องการได้ตลอดเวลาและอ่านข้อความที่ตามมาทั้งหมด

เวิร์กโฟลว์ของ Queue Messaging / Consumer Group

ในระบบส่งข้อความคิวแทนที่จะเป็นผู้บริโภครายเดียวกลุ่มผู้บริโภคที่มีรหัสกลุ่มเดียวกันจะสมัครรับหัวข้อ พูดง่ายๆคือผู้บริโภคที่สมัครรับหัวข้อด้วยGroup IDเดียวกันถือเป็นกลุ่มเดียวและมีการแชร์ข้อความระหว่างกัน ให้เราตรวจสอบขั้นตอนการทำงานจริงของระบบนี้

  • ผู้ผลิตส่งข้อความถึงหัวข้อในช่วงเวลาปกติ

  • Kafka จัดเก็บข้อความทั้งหมดในพาร์ติชันที่กำหนดค่าไว้สำหรับหัวข้อนั้น ๆ ซึ่งคล้ายกับสถานการณ์ก่อนหน้านี้

  • ผู้บริโภคเดียวเป็นสมาชิกกับหัวข้อที่เฉพาะเจาะจงเช่นสมมติกระทู้-01กับรหัสกลุ่มเป็นกลุ่มที่ 1

  • ปฏิสัมพันธ์ Kafka กับผู้บริโภคในลักษณะเดียวกับผับ-Sub Messaging จนผู้บริโภคใหม่สมัครหัวข้อเดียวกันกระทู้-01แบบเดียวกับที่ID กลุ่มเป็นกลุ่มที่ 1

  • เมื่อผู้บริโภครายใหม่มาถึง Kafka จะเปลี่ยนการทำงานเป็นโหมดแบ่งปันและแบ่งปันข้อมูลระหว่างผู้บริโภคทั้งสอง การแบ่งปันนี้จะดำเนินต่อไปจนกว่าจำนวนผู้ร่วมก่อเหตุจะถึงจำนวนพาร์ติชันที่กำหนดค่าไว้สำหรับหัวข้อนั้น ๆ

  • เมื่อจำนวนผู้บริโภคเกินจำนวนพาร์ติชันผู้ใช้รายใหม่จะไม่ได้รับข้อความใด ๆ อีกจนกว่าผู้บริโภคที่มีอยู่จะยกเลิกการเป็นสมาชิก สถานการณ์นี้เกิดขึ้นเนื่องจากผู้บริโภคแต่ละรายใน Kafka จะได้รับมอบหมายอย่างน้อยหนึ่งพาร์ติชันและเมื่อกำหนดพาร์ติชันทั้งหมดให้กับผู้บริโภคที่มีอยู่แล้วผู้บริโภครายใหม่จะต้องรอ

  • คุณลักษณะนี้จะเรียกว่าเป็นกลุ่มผู้บริโภค ในทำนองเดียวกัน Kafka จะมอบสิ่งที่ดีที่สุดให้กับทั้งสองระบบในลักษณะที่เรียบง่ายและมีประสิทธิภาพ

บทบาทของ ZooKeeper

การพึ่งพาที่สำคัญของ Apache Kafka คือ Apache Zookeeper ซึ่งเป็นบริการกำหนดค่าและซิงโครไนซ์แบบกระจาย Zookeeper ทำหน้าที่เป็นอินเทอร์เฟซการประสานงานระหว่างโบรกเกอร์ Kafka และผู้บริโภค เซิร์ฟเวอร์ Kafka แบ่งปันข้อมูลผ่านคลัสเตอร์ Zookeeper Kafka จัดเก็บข้อมูลเมตาพื้นฐานใน Zookeeper เช่นข้อมูลเกี่ยวกับหัวข้อนายหน้าการชดเชยผู้บริโภค (ตัวอ่านคิว) เป็นต้น

เนื่องจากข้อมูลที่สำคัญทั้งหมดจะถูกเก็บไว้ใน Zookeeper และโดยปกติจะจำลองข้อมูลนี้ในทั้งชุดความล้มเหลวของนายหน้า Kafka / Zookeeper จึงไม่ส่งผลกระทบต่อสถานะของคลัสเตอร์ Kafka คาฟคาจะคืนสถานะเมื่อ Zookeeper รีสตาร์ท ทำให้ Kafka หยุดทำงานเป็นศูนย์ การเลือกตั้งผู้นำระหว่างนายหน้าคาฟคายังทำได้โดยใช้ Zookeeper ในกรณีที่ผู้นำล้มเหลว

ต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการดูแลสัตว์โปรดดูZookeeper

ให้เราดำเนินการเพิ่มเติมเกี่ยวกับวิธีการติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณในบทถัดไป

ต่อไปนี้เป็นขั้นตอนในการติดตั้ง Java บนเครื่องของคุณ

ขั้นตอนที่ 1 - ตรวจสอบการติดตั้ง Java

หวังว่าคุณได้ติดตั้ง java บนเครื่องของคุณแล้วดังนั้นคุณเพียงแค่ยืนยันโดยใช้คำสั่งต่อไปนี้

$ java -version

หากติดตั้ง java บนเครื่องของคุณสำเร็จคุณจะเห็นเวอร์ชันของ Java ที่ติดตั้ง

ขั้นตอนที่ 1.1 - ดาวน์โหลด JDK

หากไม่ได้ดาวน์โหลด Java โปรดดาวน์โหลด JDK เวอร์ชันล่าสุดโดยไปที่ลิงค์ต่อไปนี้และดาวน์โหลดเวอร์ชันล่าสุด

http://www.oracle.com/technetwork/java/javase/downloads/index.html

ตอนนี้เวอร์ชันล่าสุดคือ JDK 8u 60 และไฟล์คือ“ jdk-8u60-linux-x64.tar.gz” โปรดดาวน์โหลดไฟล์บนเครื่องของคุณ

ขั้นตอนที่ 1.2 - แตกไฟล์

โดยทั่วไปไฟล์ที่กำลังดาวน์โหลดจะถูกเก็บไว้ในโฟลเดอร์ดาวน์โหลดตรวจสอบและแยกการตั้งค่า tar โดยใช้คำสั่งต่อไปนี้

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

ขั้นตอนที่ 1.3 - ย้ายไปที่ Opt Directory

ในการทำให้ java พร้อมใช้งานสำหรับผู้ใช้ทั้งหมดให้ย้ายเนื้อหา java ที่แยกแล้วไปยังusr / local / java / folder

$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/

ขั้นตอนที่ 1.4 - กำหนดเส้นทาง

ในการกำหนดเส้นทางและตัวแปร JAVA_HOME ให้เพิ่มคำสั่งต่อไปนี้ในไฟล์ ~ / .bashrc

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

ตอนนี้ใช้การเปลี่ยนแปลงทั้งหมดในระบบที่กำลังทำงานอยู่

$ source ~/.bashrc

ขั้นตอนที่ 1.5 - ทางเลือก Java

ใช้คำสั่งต่อไปนี้เพื่อเปลี่ยน Java Alternatives

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6 - ตอนนี้ตรวจสอบ java โดยใช้คำสั่งการตรวจสอบ (java -version) ที่อธิบายไว้ในขั้นตอนที่ 1

ขั้นตอนที่ 2 - การติดตั้ง ZooKeeper Framework

ขั้นตอนที่ 2.1 - ดาวน์โหลด ZooKeeper

ในการติดตั้ง ZooKeeper framework บนเครื่องของคุณให้ไปที่ลิงค์ต่อไปนี้และดาวน์โหลด ZooKeeper เวอร์ชันล่าสุด

http://zookeeper.apache.org/releases.html

ณ ตอนนี้ ZooKeeper เวอร์ชันล่าสุดคือ 3.4.6 (ZooKeeper-3.4.6.tar.gz)

ขั้นตอนที่ 2.2 - แตกไฟล์ tar

แตกไฟล์ tar โดยใช้คำสั่งต่อไปนี้

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data

ขั้นตอนที่ 2.3 - สร้างไฟล์กำหนดค่า

เปิดไฟล์คอนฟิกูเรชันชื่อconf / zoo.cfgโดยใช้คำสั่ง vi“ conf / zoo.cfg” และพารามิเตอร์ต่อไปนี้ทั้งหมดเพื่อตั้งเป็นจุดเริ่มต้น

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

เมื่อบันทึกไฟล์คอนฟิกูเรชันเรียบร้อยแล้วและกลับไปที่เทอร์มินัลอีกครั้งคุณสามารถเริ่มเซิร์ฟเวอร์ Zookeeper ได้

ขั้นตอนที่ 2.4 - เริ่มเซิร์ฟเวอร์ ZooKeeper

$ bin/zkServer.sh start

หลังจากดำเนินการคำสั่งนี้คุณจะได้รับคำตอบตามที่แสดงด้านล่าง -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED

ขั้นตอนที่ 2.5 - เริ่ม CLI

$ bin/zkCli.sh

หลังจากพิมพ์คำสั่งด้านบนคุณจะเชื่อมต่อกับเซิร์ฟเวอร์ Zookeeper และจะได้รับคำตอบด้านล่าง

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

ขั้นตอนที่ 2.6 - หยุดเซิร์ฟเวอร์ Zookeeper

หลังจากเชื่อมต่อเซิร์ฟเวอร์และดำเนินการทั้งหมดแล้วคุณสามารถหยุดเซิร์ฟเวอร์ Zookeeper ได้ด้วยคำสั่งต่อไปนี้ -

$ bin/zkServer.sh stop

ตอนนี้คุณได้ติดตั้ง Java และ ZooKeeper บนเครื่องของคุณเรียบร้อยแล้ว ให้เราดูขั้นตอนในการติดตั้ง Apache Kafka

ขั้นตอนที่ 3 - การติดตั้ง Apache Kafka

ให้เราทำตามขั้นตอนต่อไปนี้เพื่อติดตั้ง Kafka บนเครื่องของคุณ

ขั้นตอนที่ 3.1 - ดาวน์โหลด Kafka

ในการติดตั้ง Kafka บนเครื่องของคุณคลิกที่ลิงค์ด้านล่าง -

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

ตอนนี้เวอร์ชันล่าสุดคือ - kafka_2.11_0.9.0.0.tgz จะถูกดาวน์โหลดลงในเครื่องของคุณ

ขั้นตอนที่ 3.2 - แตกไฟล์ tar

แตกไฟล์ tar โดยใช้คำสั่งต่อไปนี้ -

$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

ตอนนี้คุณได้ดาวน์โหลด Kafka เวอร์ชันล่าสุดบนเครื่องของคุณแล้ว

ขั้นตอนที่ 3.3 - เริ่มเซิร์ฟเวอร์

คุณสามารถเริ่มต้นเซิร์ฟเวอร์โดยให้คำสั่งต่อไปนี้ -

$ bin/kafka-server-start.sh config/server.properties

หลังจากเซิร์ฟเวอร์เริ่มทำงานคุณจะเห็นคำตอบด้านล่างบนหน้าจอของคุณ -

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

ขั้นตอนที่ 4 - หยุดเซิร์ฟเวอร์

หลังจากดำเนินการทั้งหมดแล้วคุณสามารถหยุดเซิร์ฟเวอร์ได้โดยใช้คำสั่งต่อไปนี้ -

$ bin/kafka-server-stop.sh config/server.properties

ตอนนี้เราได้พูดคุยเกี่ยวกับการติดตั้ง Kafka แล้วเราสามารถเรียนรู้วิธีดำเนินการพื้นฐานบน Kafka ได้ในบทถัดไป

ขั้นแรกให้เราเริ่มใช้การกำหนดค่าโบรกเกอร์โหนดเดียวจากนั้นเราจะย้ายการตั้งค่าของเราไปยังการกำหนดค่าโบรกเกอร์หลายโหนดเดียว

หวังว่าตอนนี้คุณจะติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณแล้ว ก่อนที่จะย้ายไปที่ Kafka Cluster Setup ก่อนอื่นคุณต้องเริ่ม ZooKeeper ของคุณเนื่องจาก Kafka Cluster ใช้ ZooKeeper

เริ่ม ZooKeeper

เปิดเทอร์มินัลใหม่และพิมพ์คำสั่งต่อไปนี้ -

bin/zookeeper-server-start.sh config/zookeeper.properties

ในการเริ่ม Kafka Broker ให้พิมพ์คำสั่งต่อไปนี้ -

bin/kafka-server-start.sh config/server.properties

หลังจากเริ่ม Kafka Broker พิมพ์คำสั่งjpsบนเทอร์มินัล ZooKeeper และคุณจะเห็นคำตอบต่อไปนี้ -

821 QuorumPeerMain
928 Kafka
931 Jps

ตอนนี้คุณสามารถเห็นภูตสองตัวที่ทำงานบนเทอร์มินัลโดยที่ QuorumPeerMain คือ ZooKeeper daemon และอีกตัวคือ Kafka daemon

การกำหนดค่าโบรกเกอร์โหนดเดียว - เดี่ยว

ในการกำหนดค่านี้คุณมีอินสแตนซ์ ZooKeeper และรหัสนายหน้าเพียงชุดเดียว ต่อไปนี้เป็นขั้นตอนในการกำหนดค่า -

Creating a Kafka Topic- Kafka มียูทิลิตี้บรรทัดคำสั่งชื่อkafka-topics.shเพื่อสร้างหัวข้อบนเซิร์ฟเวอร์ เปิดเทอร์มินัลใหม่และพิมพ์ตัวอย่างด้านล่าง

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

เราเพิ่งสร้างหัวข้อชื่อHello-Kafkaด้วยพาร์ติชันเดียวและตัวประกอบการจำลองหนึ่งตัว ผลลัพธ์ที่สร้างขึ้นข้างต้นจะคล้ายกับผลลัพธ์ต่อไปนี้ -

Output- สร้างหัวข้อHello-Kafka

เมื่อสร้างหัวข้อแล้วคุณจะได้รับการแจ้งเตือนในหน้าต่างเทอร์มินัลโบรกเกอร์ Kafka และบันทึกสำหรับหัวข้อที่สร้างขึ้นซึ่งระบุไว้ใน“ / tmp / kafka-logs /“ ในไฟล์ config / server.properties

รายการหัวข้อ

ในการรับรายการหัวข้อในเซิร์ฟเวอร์ Kafka คุณสามารถใช้คำสั่งต่อไปนี้ -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

เนื่องจากเราได้สร้างหัวข้อจึงจะแสดงเฉพาะHello-Kafkaเท่านั้น สมมติว่าหากคุณสร้างมากกว่าหนึ่งหัวข้อคุณจะได้รับชื่อหัวข้อในผลลัพธ์

เริ่ม Producer เพื่อส่งข้อความ

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

จากไวยากรณ์ข้างต้นจำเป็นต้องมีพารามิเตอร์หลักสองตัวสำหรับไคลเอนต์บรรทัดคำสั่งผู้ผลิต -

Broker-list- รายชื่อโบรกเกอร์ที่เราต้องการส่งข้อความไป ในกรณีนี้เรามีนายหน้าเพียงรายเดียว ไฟล์ Config / server.properties มี id พอร์ตโบรกเกอร์เนื่องจากเราทราบว่าโบรกเกอร์ของเรากำลังฟังพอร์ต 9092 ดังนั้นคุณจึงสามารถระบุได้โดยตรง

ชื่อหัวข้อ - นี่คือตัวอย่างสำหรับชื่อหัวข้อ

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

โปรดิวเซอร์จะรอข้อมูลจาก stdin และเผยแพร่ไปยังคลัสเตอร์ Kafka โดยดีฟอลต์ทุกบรรทัดใหม่จะถูกเผยแพร่เป็นข้อความใหม่จากนั้นคุณสมบัติของผู้ผลิตเริ่มต้นจะถูกระบุในไฟล์config / producer.properties ตอนนี้คุณสามารถพิมพ์ข้อความสองสามบรรทัดในเทอร์มินัลดังที่แสดงด้านล่าง

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

เริ่มให้ผู้บริโภครับข้อความ

คล้ายกับการผลิตคุณสมบัติของผู้บริโภคเริ่มต้นที่ระบุไว้ในconfig / consumer.proper สัมพันธ์ไฟล์ เปิดเทอร์มินัลใหม่และพิมพ์ไวยากรณ์ด้านล่างเพื่อใช้ข้อความ

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

สุดท้ายคุณสามารถป้อนข้อความจากเทอร์มินัลของผู้ผลิตและเห็นข้อความเหล่านี้ปรากฏในเทอร์มินัลของผู้บริโภค ณ ตอนนี้คุณมีความเข้าใจเป็นอย่างดีเกี่ยวกับคลัสเตอร์โหนดเดียวกับโบรกเกอร์เดียว ให้เราไปยังการกำหนดค่าโบรกเกอร์หลายรายการ

การกำหนดค่าโบรกเกอร์หลายโหนดเดียว

ก่อนที่จะย้ายไปยังการตั้งค่าคลัสเตอร์โบรกเกอร์หลายรายการให้เริ่มเซิร์ฟเวอร์ ZooKeeper ของคุณก่อน

Create Multiple Kafka Brokers- เรามีอินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการอยู่แล้วใน con-fig / server.properties ตอนนี้เราต้องการอินสแตนซ์โบรกเกอร์หลายอินสแตนซ์ดังนั้นให้คัดลอกไฟล์ server.prop-erties ที่มีอยู่ลงในไฟล์ config ใหม่สองไฟล์และเปลี่ยนชื่อเป็น server-one.properties และ server-two.prop-erties จากนั้นแก้ไขไฟล์ใหม่ทั้งสองไฟล์และกำหนดการเปลี่ยนแปลงต่อไปนี้ -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- หลังจากทำการเปลี่ยนแปลงทั้งหมดบนเซิร์ฟเวอร์สามเครื่องแล้วให้เปิดเทอร์มินัลใหม่สามเทอร์มินัลเพื่อเริ่มโบรกเกอร์ทีละรายการ

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

ตอนนี้เรามีโบรกเกอร์สามแห่งที่ทำงานบนเครื่อง ลองด้วยตัวเองเพื่อตรวจสอบภูตทั้งหมดโดยพิมพ์jps บนเทอร์มินัล ZooKeeper จากนั้นคุณจะเห็นการตอบสนอง

การสร้างหัวข้อ

ให้เรากำหนดค่าปัจจัยการจำลองเป็นสามสำหรับหัวข้อนี้เนื่องจากเรามีโบรกเกอร์สามแห่งที่ทำงานอยู่ หากคุณมีโบรกเกอร์สองรายค่าแบบจำลองที่กำหนดจะเป็นสอง

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

อธิบายคำสั่งที่ใช้ในการตรวจสอบซึ่งนายหน้าจะฟังในหัวข้อสร้างขึ้นในปัจจุบันที่แสดงด้านล่าง -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

จากผลลัพธ์ข้างต้นเราสามารถสรุปได้ว่าบรรทัดแรกให้ข้อมูลสรุปของพาร์ติชันทั้งหมดแสดงชื่อหัวข้อจำนวนพาร์ติชันและปัจจัยการจำลองแบบที่เราได้เลือกไว้แล้ว ในบรรทัดที่สองแต่ละโหนดจะเป็นผู้นำสำหรับส่วนที่สุ่มเลือกของพาร์ติชัน

ในกรณีของเราเราเห็นว่าโบรกเกอร์รายแรกของเรา (ที่มีโบรกเกอร์ id 0) เป็นผู้นำ จากนั้น Replicas: 0,2,1 หมายความว่าโบรกเกอร์ทั้งหมดทำซ้ำหัวข้อในที่สุดIsrก็คือชุดของแบบจำลองในการซิงค์ นี่คือส่วนย่อยของแบบจำลองที่มีชีวิตอยู่ในขณะนี้และถูกติดตามโดยผู้นำ

เริ่ม Producer เพื่อส่งข้อความ

ขั้นตอนนี้ยังคงเหมือนกับในการตั้งค่านายหน้าเดี่ยว

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

เริ่มให้ผู้บริโภครับข้อความ

ขั้นตอนนี้ยังคงเหมือนเดิมตามที่แสดงในการตั้งค่านายหน้าเดี่ยว

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

การใช้งานหัวข้อพื้นฐาน

ในบทนี้เราจะพูดถึงการดำเนินงานของหัวข้อพื้นฐานต่างๆ

การแก้ไขหัวข้อ

ดังที่คุณได้เข้าใจวิธีสร้างหัวข้อใน Kafka Cluster แล้ว ตอนนี้ให้เราแก้ไขหัวข้อที่สร้างขึ้นโดยใช้คำสั่งต่อไปนี้

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

การลบหัวข้อ

ในการลบหัวข้อคุณสามารถใช้ไวยากรณ์ต่อไปนี้

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −สิ่งนี้จะไม่มีผลกระทบหาก delete.topic.enable ไม่ได้ตั้งค่าเป็นจริง

ให้เราสร้างแอปพลิเคชันสำหรับเผยแพร่และใช้งานข้อความโดยใช้ไคลเอ็นต์ Java ไคลเอ็นต์ผู้ผลิต Kafka ประกอบด้วย API ต่อไปนี้

KafkaProducer API

ให้เราเข้าใจชุดที่สำคัญที่สุดของ Kafka Producer API ในส่วนนี้ ส่วนกลางของ KafkaProducer API คือคลาสKafkaProducer คลาส KafkaProducer มีตัวเลือกในการเชื่อมต่อโบรกเกอร์ Kafka ในคอนสตรัคเตอร์ด้วยวิธีการดังต่อไปนี้

  • คลาส KafkaProducer จัดเตรียมวิธีการส่งเพื่อส่งข้อความไปยังหัวข้อแบบอะซิงโครนัส ลายเซ็นของ send () มีดังต่อไปนี้

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - ผู้ผลิตจัดการบัฟเฟอร์ของเร็กคอร์ดที่รอการส่ง

  • Callback - การโทรกลับที่ผู้ใช้จัดหาเพื่อดำเนินการเมื่อเซิร์ฟเวอร์ได้รับการยอมรับว่าบันทึกได้รับการปรับปรุง (null หมายถึงไม่มีการโทรกลับ)

  • คลาส KafkaProducer มีวิธีการล้างเพื่อให้แน่ใจว่าข้อความที่ส่งไปก่อนหน้านี้เสร็จสมบูรณ์แล้วจริงๆ ไวยากรณ์ของวิธีการล้างมีดังนี้ -

public void flush()
  • คลาส KafkaProducer จัดเตรียมเมธอด partitionFor ซึ่งช่วยในการรับข้อมูลเมตาของพาร์ติชันสำหรับหัวข้อที่กำหนด สามารถใช้สำหรับการแบ่งพาร์ติชันแบบกำหนดเอง ลายเซ็นของวิธีนี้มีดังนี้ -

public Map metrics()

ส่งคืนแผนที่ของเมตริกภายในที่ดูแลโดยผู้สร้าง

  • public void close () - คลาส KafkaProducer จัดเตรียมบล็อกวิธีการปิดจนกว่าคำขอที่ส่งก่อนหน้านี้ทั้งหมดจะเสร็จสมบูรณ์

Producer API

ส่วนกลางของ Producer API คือคลาสProducer คลาส Producer มีตัวเลือกในการเชื่อมต่อโบรกเกอร์ Kafka ในตัวสร้างโดยวิธีการต่อไปนี้

คลาส Producer

คลาสผู้ผลิตจัดเตรียมวิธีการส่งไปยัง send ข้อความไปยังหัวข้อเดียวหรือหลายหัวข้อโดยใช้ลายเซ็นต่อไปนี้

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

ผู้ผลิตมีสองประเภท - Sync และ Async.

การกำหนดค่า API เดียวกันใช้กับผู้ผลิตSyncเช่นกัน ความแตกต่างระหว่างพวกเขาคือผู้ผลิตซิงค์ส่งข้อความโดยตรง แต่ส่งข้อความในพื้นหลัง ผู้ผลิต Async เป็นที่ต้องการเมื่อคุณต้องการทรูพุตที่สูงขึ้น ในรีลีสก่อนหน้านี้เช่น 0.8 ผู้ผลิต async ไม่มีการเรียกกลับสำหรับ send () เพื่อลงทะเบียนตัวจัดการข้อผิดพลาด สิ่งนี้มีให้เฉพาะในรุ่นปัจจุบัน 0.9

โมฆะสาธารณะปิด ()

คลาส Producer ให้ close วิธีปิดการเชื่อมต่อพูลผู้ผลิตกับ Kafka bro-kers ทั้งหมด

การตั้งค่าการกำหนดค่า

การตั้งค่าคอนฟิกูเรชันหลักของ Producer API แสดงอยู่ในตารางต่อไปนี้เพื่อให้อยู่ในสถานะต่ำกว่า -

ส. เลขที่ การตั้งค่าการกำหนดค่าและคำอธิบาย
1

client.id

ระบุแอปพลิเคชันผู้ผลิต

2

producer.type

ไม่ว่าจะซิงค์หรือไม่ซิงค์

3

acks

การกำหนดค่า acks ควบคุมเกณฑ์ภายใต้คำร้องขอของผู้ผลิตนั้นเป็นไปตามความสมบูรณ์

4

retries

หากคำขอโปรดิวเซอร์ล้มเหลวให้ลองใหม่โดยอัตโนมัติด้วยค่าเฉพาะ

5

bootstrap.servers

bootstrapping รายชื่อโบรกเกอร์

6

linger.ms

หากคุณต้องการลดจำนวนคำขอคุณสามารถตั้งค่า linger.ms เป็นค่าที่มากกว่าค่าบางค่าได้

7

key.serializer

คีย์สำหรับอินเทอร์เฟซ Serializer

8

value.serializer

ค่าสำหรับอินเทอร์เฟซ Serializer

9

batch.size

ขนาดบัฟเฟอร์

10

buffer.memory

ควบคุมจำนวนหน่วยความจำทั้งหมดที่พร้อมใช้งานสำหรับผู้สร้างสำหรับการบัฟเฟอร์

ProducerRecord API

ProducerRecord เป็นคู่คีย์ / ค่าที่ส่งไปยังคลัสเตอร์ Kafka ตัวสร้างคลาส ProducerRecord สำหรับสร้างเรกคอร์ดที่มีพาร์ติชันคู่คีย์และค่าโดยใช้ลายเซ็นต่อไปนี้

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - ชื่อหัวข้อที่ผู้ใช้กำหนดซึ่งจะต่อท้ายเพื่อบันทึก

  • Partition - จำนวนพาร์ติชัน

  • Key - คีย์ที่จะรวมอยู่ในบันทึก

  • Value - บันทึกเนื้อหา
public ProducerRecord (string topic, k key, v value)

ตัวสร้างคลาส ProducerRecord ใช้เพื่อสร้างเรกคอร์ดที่มีคีย์คู่ค่าและไม่มีพาร์ติชัน

  • Topic - สร้างหัวข้อเพื่อกำหนดบันทึก

  • Key - คีย์สำหรับบันทึก

  • Value - บันทึกเนื้อหา

public ProducerRecord (string topic, v value)

คลาส ProducerRecord สร้างเร็กคอร์ดโดยไม่มีพาร์ติชันและคีย์

  • Topic - สร้างหัวข้อ

  • Value - บันทึกเนื้อหา

เมธอดคลาส ProducerRecord แสดงอยู่ในตารางต่อไปนี้ -

ส. เลขที่ วิธีการเรียนและคำอธิบาย
1

public string topic()

หัวข้อจะต่อท้ายบันทึก

2

public K key()

คีย์ที่จะรวมอยู่ในบันทึก หากไม่มีคีย์ดังกล่าวค่าว่างจะถูกเปลี่ยนใหม่ที่นี่

3

public V value()

บันทึกเนื้อหา

4

partition()

จำนวนพาร์ติชันสำหรับบันทึก

แอปพลิเคชั่น SimpleProducer

ก่อนสร้างแอปพลิเคชันขั้นแรกให้เริ่มโบรกเกอร์ ZooKeeper และ Kafka จากนั้นสร้างหัวข้อของคุณเองในโบรกเกอร์คาฟคาโดยใช้คำสั่งสร้างหัวข้อ หลังจากนั้นสร้างคลาส java ชื่อSim-pleProducer.javaแล้วพิมพ์ coding ต่อไปนี้

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - สามารถคอมไพล์แอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - แอปพลิเคชันสามารถดำเนินการได้โดยใช้คำสั่งต่อไปนี้

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

ตัวอย่างผู้บริโภคอย่างง่าย

ณ ตอนนี้เราได้สร้างผู้ผลิตเพื่อส่งข้อความไปยังคลัสเตอร์ Kafka ตอนนี้ให้เราสร้างผู้บริโภคเพื่อบริโภคข้อความจากคลัสเตอร์ Kafka KafkaConsumer API ใช้เพื่อบริโภคข้อความจากคลัสเตอร์ Kafka ตัวสร้างคลาส KafkaConsumer ถูกกำหนดไว้ด้านล่าง

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - ส่งคืนแผนที่ของการกำหนดค่าผู้บริโภค

คลาส KafkaConsumer มีวิธีการที่สำคัญดังต่อไปนี้ซึ่งแสดงไว้ในตารางด้านล่าง

ส. เลขที่ วิธีการและคำอธิบาย
1

public java.util.Set<TopicPar-tition> assignment()

รับชุดของพาร์ติชั่นที่ผู้ผลิตคอนซูเมอร์มอบหมาย

2

public string subscription()

สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก

4

public void unsubscribe()

ยกเลิกการสมัครหัวข้อจากรายการพาร์ติชันที่กำหนด

5

public void sub-scribe(java.util.List<java.lang.String> topics)

สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก หากรายการหัวข้อที่ระบุว่างเปล่าจะถือว่าเหมือนกับการยกเลิกการสมัคร ()

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

รูปแบบอาร์กิวเมนต์หมายถึงรูปแบบการสมัครสมาชิกในรูปแบบของนิพจน์ทั่วไปและอาร์กิวเมนต์ Listener จะได้รับการแจ้งเตือนจากรูปแบบการสมัครสมาชิก

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

กำหนดรายการพาร์ติชันให้กับลูกค้าด้วยตนเอง

8

poll()

ดึงข้อมูลสำหรับหัวข้อหรือพาร์ติชันที่ระบุโดยใช้หนึ่งในสมัคร / กำหนด API สิ่งนี้จะส่งคืนข้อผิดพลาดหากไม่ได้สมัครหัวข้อก่อนการสำรวจข้อมูล

9

public void commitSync()

คอมมิตออฟเซ็ตที่ส่งคืนในแบบสำรวจล่าสุด () สำหรับรายการหัวข้อและพาร์ติชันย่อยที่เขียนไว้ทั้งหมด การดำเนินการเดียวกันนี้ถูกนำไปใช้กับคอมมิต Asyn ()

10

public void seek(TopicPartition partition, long offset)

ดึงค่าชดเชยปัจจุบันที่ผู้บริโภคจะใช้ในวิธีการสำรวจความคิดเห็น () ถัดไป

11

public void resume()

ดำเนินการต่อพาร์ติชันที่หยุดชั่วคราว

12

public void wakeup()

ปลุกผู้บริโภค

ConsumerRecord API

ConsumerRecord API ใช้เพื่อรับเรกคอร์ดจากคลัสเตอร์ Kafka API นี้ประกอบด้วยชื่อหัวข้อหมายเลขพาร์ติชันซึ่งจะได้รับเร็กคอร์ดและออฟเซ็ตที่ชี้ไปยังเร็กคอร์ดในพาร์ติชัน Kafka คลาส ConsumerRecord ใช้เพื่อสร้างเรกคอร์ดผู้บริโภคที่มีชื่อหัวข้อเฉพาะจำนวนพาร์ติชันและคู่ <คีย์ค่า> มีลายเซ็นดังต่อไปนี้

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - ชื่อหัวข้อสำหรับบันทึกผู้บริโภคที่ได้รับจากคลัสเตอร์ Kafka

  • Partition - พาร์ติชันสำหรับหัวข้อ

  • Key - คีย์ของเรกคอร์ดหากไม่มีคีย์อยู่จะถูกส่งกลับ

  • Value - บันทึกเนื้อหา

ConsumerRecords API

ConsumerRecords API ทำหน้าที่เป็นคอนเทนเนอร์สำหรับ ConsumerRecord API นี้ใช้เพื่อเก็บรายการ ConsumerRecord ต่อพาร์ติชันสำหรับหัวข้อเฉพาะ ตัวสร้างของมันถูกกำหนดไว้ด้านล่าง

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - ส่งคืนแผนที่พาร์ติชันสำหรับหัวข้อใดหัวข้อหนึ่ง

  • Records - รายการส่งคืน ConsumerRecord

คลาส ConsumerRecords มีการกำหนดวิธีการดังต่อไปนี้

ส. เลขที่ วิธีการและคำอธิบาย
1

public int count()

จำนวนบันทึกสำหรับหัวข้อทั้งหมด

2

public Set partitions()

ชุดของพาร์ติชันที่มีข้อมูลในชุดระเบียนนี้ (หากไม่มีการส่งคืนข้อมูลชุดนั้นจะว่างเปล่า)

3

public Iterator iterator()

Iterator ช่วยให้คุณสามารถวนรอบคอลเลกชันรับหรือย้ายองค์ประกอบได้

4

public List records()

รับรายการบันทึกสำหรับพาร์ติชันที่กำหนด

การตั้งค่าการกำหนดค่า

การตั้งค่าคอนฟิกูเรชันสำหรับการตั้งค่าคอนฟิกูเรชันหลักของ Consumer client API แสดงไว้ด้านล่าง -

ส. เลขที่ การตั้งค่าและคำอธิบาย
1

bootstrap.servers

Bootstrapping รายชื่อโบรกเกอร์

2

group.id

กำหนดผู้บริโภคแต่ละรายให้กับกลุ่ม

3

enable.auto.commit

เปิดใช้งานการคอมมิตอัตโนมัติสำหรับการชดเชยหากค่าเป็นจริงมิฉะนั้นจะไม่ถูกคอมมิต

4

auto.commit.interval.ms

ย้อนกลับว่าจะเขียนค่าออฟเซ็ตที่บริโภคที่อัปเดตบ่อยเพียงใดไปยัง ZooKeeper

5

session.timeout.ms

ระบุจำนวนมิลลิวินาทีที่ Kafka จะรอให้ ZooKeeper ตอบกลับคำขอ (อ่านหรือเขียน) ก่อนที่จะยอมแพ้และใช้ข้อความต่อไป

แอปพลิเคชั่น SimpleConsumer

ขั้นตอนการสมัครผู้ผลิตยังคงเหมือนเดิมที่นี่ ขั้นแรกเริ่มนายหน้า ZooKeeper และ Kafka จากนั้นสร้างแอปพลิเคชันSimpleConsumerด้วยคลาส java ชื่อSimpleCon-sumer.javaและพิมพ์รหัสต่อไปนี้

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - สามารถคอมไพล์แอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − แอปพลิเคชันสามารถดำเนินการได้โดยใช้คำสั่งต่อไปนี้

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- เปิดผู้ผลิต CLI และส่งข้อความไปยังหัวข้อ คุณสามารถใส่ smple input เป็น 'Hello Consumer'

Output - ต่อไปนี้จะเป็นผลลัพธ์

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

กลุ่มผู้บริโภคคือการใช้งานแบบหลายเธรดหรือหลายเครื่องจากหัวข้อ Kafka

กลุ่มผู้บริโภค

  • ผู้บริโภคสามารถเข้าร่วมกลุ่มได้โดยใช้group.idเดียวกัน

  • ความขนานสูงสุดของกลุ่มคือจำนวนผู้บริโภคในกลุ่ม←ไม่มีพาร์ติชัน

  • Kafka กำหนดพาร์ติชันของหัวข้อให้กับผู้บริโภคในกลุ่มเพื่อให้แต่ละพาร์ติชันถูกใช้โดยผู้บริโภคเพียงคนเดียวในกลุ่ม

  • Kafka รับประกันว่าข้อความจะถูกอ่านโดยผู้บริโภครายเดียวในกลุ่มเท่านั้น

  • ผู้บริโภคสามารถดูข้อความตามลำดับที่จัดเก็บไว้ในบันทึก

การปรับสมดุลของผู้บริโภคอีกครั้ง

การเพิ่มกระบวนการ / เธรดมากขึ้นจะทำให้ Kafka ปรับสมดุลใหม่ หากผู้บริโภคหรือนายหน้ารายใดไม่สามารถส่ง heartbeat ไปยัง ZooKeeper ก็สามารถกำหนดค่าใหม่ได้ผ่านคลัสเตอร์ Kafka ในระหว่างการปรับสมดุลใหม่ Kafka จะกำหนดพาร์ติชันที่พร้อมใช้งานให้กับเธรดที่มีอยู่โดยอาจย้ายพาร์ติชันไปยังกระบวนการอื่น

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

การรวบรวม

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

การดำเนินการ

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

ที่นี่เราได้สร้างชื่อกลุ่มตัวอย่างเป็นmy-groupกับผู้บริโภคสองคน ในทำนองเดียวกันคุณสามารถสร้างกลุ่มและจำนวนผู้บริโภคในกลุ่มได้

อินพุต

เปิดผู้ผลิต CLI และส่งข้อความเช่น -

Test consumer group 01
Test consumer group 02

ผลลัพธ์ของกระบวนการแรก

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

ผลลัพธ์ของกระบวนการที่สอง

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

ตอนนี้หวังว่าคุณจะเข้าใจ SimpleConsumer และ ConsumeGroup โดยใช้การสาธิตไคลเอ็นต์ Java ตอนนี้คุณมีแนวคิดเกี่ยวกับวิธีการส่งและรับข้อความโดยใช้ไคลเอ็นต์ Java ให้เราดำเนินการรวม Kafka กับเทคโนโลยีข้อมูลขนาดใหญ่ในบทต่อไป

ในบทนี้เราจะเรียนรู้วิธีการรวม Kafka กับ Apache Storm

เกี่ยวกับ Storm

Storm ถูกสร้างขึ้นโดย Nathan Marz และทีมงานที่ BackType ในช่วงเวลาสั้น ๆ Apache Storm กลายเป็นมาตรฐานสำหรับระบบประมวลผลแบบเรียลไทม์แบบกระจายซึ่งช่วยให้คุณประมวลผลข้อมูลจำนวนมหาศาลได้ สตอร์มเร็วมากและค่ามาตรฐานโอเวอร์คล็อกด้วยการประมวลผลมากกว่าหนึ่งล้านทูเปิลต่อวินาทีต่อโหนด Apache Storm ทำงานอย่างต่อเนื่องโดยใช้ข้อมูลจากแหล่งที่กำหนด (Spouts) และส่งข้อมูลไปยังท่อประมวลผล (Bolts) Com-bined Spouts และ Bolts สร้างโทโพโลยี

บูรณาการกับ Storm

Kafka และ Storm เสริมซึ่งกันและกันอย่างเป็นธรรมชาติและความร่วมมืออันทรงพลังของพวกเขาช่วยให้การวิเคราะห์สตรีมมิ่งแบบเรียลไทม์สำหรับข้อมูลขนาดใหญ่ที่เคลื่อนไหวอย่างรวดเร็ว การรวม Kafka และ Storm ช่วยให้นักพัฒนาสามารถนำเข้าและเผยแพร่สตรีมข้อมูลจากโทโพโลยี Storm ได้ง่ายขึ้น

กระแสความคิด

พวยกาเป็นแหล่งที่มาของลำธาร ตัวอย่างเช่นพวยกาอาจอ่านสิ่งที่เพิ่มขึ้นจากหัวข้อคาฟคาและส่งออกเป็นสตรีม โบลต์ใช้สตรีมอินพุตกระบวนการและอาจส่งกระแสข้อมูลใหม่ Bolts สามารถทำอะไรก็ได้ตั้งแต่การเรียกใช้ฟังก์ชันการกรองสิ่งที่เพิ่มขึ้นการรวมการสตรีมการรวมการสตรีมการพูดคุยกับฐานข้อมูลและอื่น ๆ แต่ละโหนดในโทโพโลยี Storm ทำงานแบบขนาน โทโพโลยีทำงานไปเรื่อย ๆ จนกว่าคุณจะยุติ Storm จะมอบหมายงานที่ล้มเหลวโดยอัตโนมัติ นอกจากนี้สตอร์มยังรับประกันว่าจะไม่มีการสูญหายของข้อมูลแม้ว่าเครื่องจะล่มและข้อความหลุดก็ตาม

ให้เราดูรายละเอียดเกี่ยวกับ API การรวม Kafka-Storm มีสามคลาสหลักในการรวม Kafka กับ Storm มีดังนี้ -

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts เป็นอินเทอร์เฟซและ ZkHosts และ StaticHosts เป็นสองการใช้งานหลัก ZkHosts ใช้เพื่อติดตามโบรกเกอร์ Kafka แบบไดนามิกโดยการรักษารายละเอียดใน ZooKeeper ในขณะที่ StaticHosts ใช้เพื่อตั้งค่าโบรกเกอร์ Kafka และรายละเอียดด้วยตนเอง / แบบคงที่ ZkHosts เป็นวิธีที่ง่ายและรวดเร็วในการเข้าถึงโบรกเกอร์ Kafka

ลายเซ็นของ ZkHosts มีดังต่อไปนี้ -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

โดยที่นายหน้า ZkStr คือโฮสต์ ZooKeeper และนายหน้า ZkPath คือเส้นทาง ZooKeeper เพื่อรักษารายละเอียดนายหน้าของคาฟคา

KafkaConfig API

API นี้ใช้เพื่อกำหนดการตั้งค่าคอนฟิกสำหรับคลัสเตอร์ Kafka ลายเซ็นของ Kafka Con-fig ถูกกำหนดไว้ดังนี้

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - BrokerHosts สามารถเป็น ZkHosts / StaticHosts

    Topic - ชื่อหัวข้อ

SpoutConfig API

Spoutconfig เป็นส่วนขยายของ KafkaConfig ที่รองรับข้อมูล ZooKeeper เพิ่มเติม

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - BrokerHosts สามารถใช้งานอินเทอร์เฟซ BrokerHosts ได้

  • Topic - ชื่อหัวข้อ

  • zkRoot - เส้นทางราก ZooKeeper

  • id −พวยกาเก็บสถานะของการชดเชยที่บริโภคใน Zookeeper รหัสควรระบุพวยกาของคุณโดยไม่ซ้ำกัน

SchemeAsMultiScheme

SchemeAsMultiScheme เป็นอินเทอร์เฟซที่กำหนดวิธีการที่ ByteBuffer ที่ใช้จาก Kafka จะเปลี่ยนเป็นพายุทูเพิล ได้มาจาก MultiScheme และยอมรับการใช้งานคลาส Scheme มีการใช้งานคลาส Scheme จำนวนมากและการใช้งานแบบนั้นคือ StringScheme ซึ่งแยกวิเคราะห์ไบต์เป็นสตริงธรรมดา นอกจากนี้ยังควบคุมการตั้งชื่อฟิลด์เอาต์พุตของคุณ ลายเซ็นถูกกำหนดไว้ดังนี้

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - บัฟเฟอร์ไบต์ที่บริโภคจากคาฟคา

KafkaSpout API

KafkaSpout คือการใช้งานพวยกาของเราซึ่งจะทำงานร่วมกับ Storm มันดึง mes-sages จากหัวข้อ kafka และส่งมันไปยังระบบนิเวศของ Storm ในรูปแบบ tuples KafkaSpout รับรายละเอียดการกำหนดค่าจาก SpoutConfig

ด้านล่างนี้เป็นโค้ดตัวอย่างเพื่อสร้าง Kafka spout แบบง่ายๆ

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

การสร้างกลอน

Bolt เป็นส่วนประกอบที่รับทูเปิลเป็นอินพุตประมวลผลทูเพิลและสร้างสิ่งทอปเปิลใหม่เป็นเอาต์พุต Bolts จะใช้อินเทอร์เฟซ IRichBolt ในโปรแกรมนี้มีการใช้โบลต์สองคลาส WordSplitter-Bolt และ WordCounterBolt เพื่อดำเนินการ

อินเทอร์เฟซ IRichBolt มีวิธีการดังต่อไปนี้ -

  • Prepare- จัดเตรียมโบลต์พร้อมสภาพแวดล้อมในการดำเนินการ ตัวดำเนินการจะเรียกใช้วิธีนี้เพื่อเริ่มต้นพวยกา

  • Execute - ประมวลผลอินพุตทูเพิลเดียว

  • Cleanup - เรียกว่าเมื่อสายฟ้ากำลังจะปิดลง

  • declareOutputFields - ประกาศสคีมาผลลัพธ์ของทูเปิล

ให้เราสร้าง SplitBolt.java ซึ่งใช้ตรรกะในการแยกประโยคออกเป็นคำและ CountBolt.java ซึ่งใช้ตรรกะในการแยกคำที่ไม่ซ้ำกันและนับจำนวนที่เกิดขึ้น

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

ส่งไปยังโทโพโลยี

โทโพโลยีแบบพายุเป็นโครงสร้างแบบ Thrift คลาส TopologyBuilder มีวิธีการที่ง่ายและสะดวกในการสร้างโทโพโลยีที่ซับซ้อน คลาส TopologyBuilder มีเมธอดในการตั้งพวยกา (setSpout) และตั้งค่าโบลต์ (setBolt) ในที่สุด TopologyBuilder ได้ createTopology เพื่อสร้าง to-pology วิธี shuffleGrouping และ fieldsGrouping ช่วยในการตั้งค่าการจัดกลุ่มสตรีมสำหรับพวยกาและสลักเกลียว

Local Cluster- เพื่อวัตถุประสงค์ในการพัฒนาเราสามารถสร้างคลัสเตอร์ท้องถิ่นโดยใช้LocalClusterวัตถุแล้วส่ง topology โดยใช้submitTopologyวิธีการLocalClusterระดับ

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

ก่อนที่จะย้ายการรวบรวมการรวม Kakfa-Storm ต้องการไลบรารี java ไคลเอนต์ ZooKeeper ผู้ดูแล ผู้ดูแลเวอร์ชัน 2.9.1 รองรับ Apache Storm เวอร์ชัน 0.9.5 (ซึ่งเราใช้ในบทช่วยสอนนี้) ดาวน์โหลดไฟล์ jar ที่ระบุด้านล่างและวางไว้ในพา ธ คลาส java

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

หลังจากรวมไฟล์อ้างอิงแล้วให้คอมไพล์โปรแกรมโดยใช้คำสั่งต่อไปนี้

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

การดำเนินการ

เริ่ม Kafka Producer CLI (อธิบายไว้ในบทก่อนหน้า) สร้างหัวข้อใหม่ชื่อmy-first-topicและให้ข้อความตัวอย่างตามที่แสดงด้านล่าง -

hello
kafka
storm
spark
test message
another test message

ตอนนี้เรียกใช้แอปพลิเคชันโดยใช้คำสั่งต่อไปนี้ -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

ตัวอย่างผลลัพธ์ของแอปพลิเคชันนี้ระบุไว้ด้านล่าง -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

ในบทนี้เราจะพูดถึงวิธีการรวม Apache Kafka กับ Spark Streaming API

เกี่ยวกับ Spark

Spark Streaming API ช่วยให้สามารถประมวลผลสตรีมข้อมูลสดที่ปรับขนาดได้ปริมาณงานสูงและทนต่อข้อผิดพลาด ข้อมูลสามารถนำเข้าจากหลายแหล่งเช่น Kafka, Flume, Twitter และอื่น ๆ และสามารถประมวลผลโดยใช้อัลกอริทึมที่ซับซ้อนเช่นฟังก์ชันระดับสูงเช่นแผนที่, ลด, เข้าร่วมและหน้าต่าง ในที่สุดข้อมูลที่ประมวลผลแล้วสามารถส่งออกไปยังระบบไฟล์ฐานข้อมูลและแดชบอร์ดแบบสดได้ Resilient Distributed Datasets (RDD) เป็นโครงสร้างข้อมูลพื้นฐานของ Spark มันเป็นคอลเลกชันของวัตถุที่กระจายไม่เปลี่ยนรูป ชุดข้อมูลแต่ละชุดใน RDD จะแบ่งออกเป็นโลจิคัลพาร์ติชันซึ่งอาจคำนวณจากโหนดต่างๆของคลัสเตอร์

บูรณาการกับ Spark

Kafka เป็นแพลตฟอร์มการส่งข้อความและการผสานรวมที่มีศักยภาพสำหรับสตรีมมิ่ง Spark Kafka ทำหน้าที่เป็นศูนย์กลางสำหรับสตรีมข้อมูลแบบเรียลไทม์และประมวลผลโดยใช้อัลกอริทึมที่ซับซ้อนใน Spark Streaming เมื่อประมวลผลข้อมูลแล้ว Spark Streaming อาจเผยแพร่ผลลัพธ์ในหัวข้อ Kafka อื่นหรือจัดเก็บใน HDFS ฐานข้อมูลหรือแดชบอร์ด แผนภาพต่อไปนี้แสดงให้เห็นถึงการไหลของแนวคิด

ตอนนี้ให้เราดูรายละเอียดเกี่ยวกับ Kafka-Spark API

SparkConf API

แสดงถึงการกำหนดค่าสำหรับแอปพลิเคชัน Spark ใช้เพื่อตั้งค่าพารามิเตอร์ Spark ต่างๆเป็นคู่คีย์ - ค่า

คลาสSparkConfมีวิธีการดังต่อไปนี้ -

  • set(string key, string value) - ตั้งค่าตัวแปรการกำหนดค่า

  • remove(string key) - ลบคีย์ออกจากการกำหนดค่า

  • setAppName(string name) - ตั้งชื่อแอปพลิเคชันสำหรับแอปพลิเคชันของคุณ

  • get(string key) - รับกุญแจ

StreamingContext API

นี่คือจุดเริ่มต้นหลักสำหรับฟังก์ชัน Spark SparkContext แสดงถึงการเชื่อมต่อกับคลัสเตอร์ Spark และสามารถใช้เพื่อสร้าง RDDs, ตัวสะสมและตัวแปรออกอากาศบนคลัสเตอร์ ลายเซ็นถูกกำหนดตามที่แสดงด้านล่าง

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - คลัสเตอร์ URL ที่จะเชื่อมต่อ (เช่น mesos: // host: port, spark: // host: port, local [4])

  • appName - ชื่องานของคุณเพื่อแสดงบน UI เว็บคลัสเตอร์

  • batchDuration - ช่วงเวลาที่ข้อมูลการสตรีมจะถูกแบ่งออกเป็นแบทช์

public StreamingContext(SparkConf conf, Duration batchDuration)

สร้าง StreamingContext โดยจัดเตรียมคอนฟิกูเรชันที่จำเป็นสำหรับ SparkContext ใหม่

  • conf - พารามิเตอร์ Spark

  • batchDuration - ช่วงเวลาที่ข้อมูลการสตรีมจะถูกแบ่งออกเป็นแบทช์

KafkaUtils API

KafkaUtils API ใช้เพื่อเชื่อมต่อคลัสเตอร์ Kafka กับ Spark streaming API นี้มีวิธีการsignifi -cant ลายเซ็นcreateStream ที่กำหนดไว้ด้านล่าง

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

วิธีการที่แสดงด้านบนใช้เพื่อสร้างสตรีมอินพุตที่ดึงข้อความจาก Kafka Brokers

  • ssc - วัตถุ StreamingContext

  • zkQuorum - องค์ประชุมผู้ดูแลสวนสัตว์

  • groupId - รหัสกลุ่มสำหรับผู้บริโภครายนี้

  • topics - ส่งคืนแผนที่หัวข้อที่ต้องการบริโภค

  • storageLevel - ระดับการจัดเก็บเพื่อใช้สำหรับจัดเก็บวัตถุที่ได้รับ

KafkaUtils API มีวิธีการ createDirectStream อีกวิธีหนึ่งซึ่งใช้ในการสร้างสตรีมอินพุตที่ดึงข้อความจาก Kafka Brokers โดยตรงโดยไม่ต้องใช้เครื่องรับใด ๆ สตรีมนี้สามารถรับประกันได้ว่าแต่ละข้อความจาก Kafka จะรวมอยู่ในการเปลี่ยนแปลงเพียงครั้งเดียว

แอปพลิเคชันตัวอย่างทำได้ใน Scala ในการรวบรวมแอปพลิเคชันโปรดดาวน์โหลดและติดตั้งsbtเครื่องมือสร้าง scala (คล้ายกับ maven) รหัสแอปพลิเคชันหลักแสดงอยู่ด้านล่าง

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

สร้างสคริปต์

การผสานรวม spark-kafka ขึ้นอยู่กับการจุดประกายการสตรีมและจุดประกายโถการรวม Kafka สร้างไฟล์ใหม่build.sbtและระบุรายละเอียดแอปพลิเคชันและการอ้างอิง SBTจะดาวน์โหลดขวดที่จำเป็นในขณะที่รวบรวมและการบรรจุแอพลิเคชัน

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

การรวบรวม / บรรจุภัณฑ์

รันคำสั่งต่อไปนี้เพื่อคอมไพล์และแพ็กเกจไฟล์ jar ของแอ็พพลิเคชัน เราจำเป็นต้องส่งไฟล์ jar ลงในคอนโซล spark เพื่อเรียกใช้แอปพลิเคชัน

sbt package

ส่งไปยัง Spark

เริ่ม Kafka Producer CLI (อธิบายไว้ในบทก่อนหน้า) สร้างหัวข้อใหม่ชื่อmy-first-topicและจัดเตรียมข้อความตัวอย่างตามที่แสดงด้านล่าง

Another spark test message

เรียกใช้คำสั่งต่อไปนี้เพื่อส่งแอปพลิเคชันไปยัง spark Console

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

ผลลัพธ์ตัวอย่างของแอปพลิเคชันนี้แสดงไว้ด้านล่าง

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

ให้เราวิเคราะห์แอปพลิเคชันแบบเรียลไทม์เพื่อรับฟีด Twitter ล่าสุดและแฮชแท็ก ก่อนหน้านี้เราได้เห็นการรวม Storm และ Spark เข้ากับ Kafka ในทั้งสองสถานการณ์เราได้สร้าง Kafka Producer (โดยใช้ cli) เพื่อส่งข้อความไปยังระบบนิเวศของ Kafka จากนั้นพายุและประกายไฟจะอ่านข้อความโดยใช้ผู้บริโภค Kafka และฉีดเข้าไปในระบบนิเวศของพายุและประกายไฟตามลำดับ ดังนั้นในทางปฏิบัติเราจำเป็นต้องสร้าง Kafka Producer ซึ่งควรจะ -

  • อ่านฟีด Twitter โดยใช้“ Twitter Streaming API”
  • ประมวลผลฟีด
  • แยก HashTags และ
  • ส่งไปที่คาฟคา

เมื่อKafkaได้รับHashTagsแล้วการผสานรวม Storm / Spark จะได้รับ infor-mation และส่งไปยังระบบนิเวศ Storm / Spark

Twitter Streaming API

คุณสามารถเข้าถึง“ Twitter Streaming API” ในภาษาโปรแกรมใดก็ได้ “ twitter4j” เป็นไลบรารี Java แบบโอเพนซอร์สที่ไม่เป็นทางการซึ่งมีโมดูลที่ใช้ Java เพื่อเข้าถึง“ Twitter Streaming API” ได้อย่างง่ายดาย "twitter4j" เป็นกรอบงานสำหรับผู้ฟังเพื่อเข้าถึงทวีต ในการเข้าถึง“ Twitter Streaming API” เราจำเป็นต้องลงชื่อเข้าใช้บัญชีผู้พัฒนา Twitter และควรได้รับสิ่งต่อไปนี้OAuth รายละเอียดการรับรองความถูกต้อง

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

เมื่อสร้างบัญชีผู้พัฒนาแล้วให้ดาวน์โหลดไฟล์ jar“ twitter4j” และวางไว้ในพา ธ คลาส java

การเข้ารหัสผู้ผลิต Twitter Kafka ที่สมบูรณ์ (KafkaTwitterProducer.java) อยู่ด้านล่าง -

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

การรวบรวม

คอมไพล์แอปพลิเคชันโดยใช้คำสั่งต่อไปนี้ -

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

การดำเนินการ

เปิดสองคอนโซล เรียกใช้แอปพลิเคชันที่รวบรวมไว้ด้านบนดังที่แสดงด้านล่างในคอนโซลเดียว

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

เรียกใช้แอปพลิเคชัน Spark / Storm ใด ๆ ที่อธิบายไว้ในบทก่อนหน้าใน win-dow อื่น ประเด็นหลักที่ควรทราบคือหัวข้อที่ใช้ควรเหมือนกันในทั้งสองกรณี ที่นี่เราใช้“ my-first-topic” เป็นชื่อหัวข้อ

เอาต์พุต

ผลลัพธ์ของแอปพลิเคชั่นนี้จะขึ้นอยู่กับคีย์เวิร์ดและฟีดปัจจุบันของ twitter มีการระบุเอาต์พุตตัวอย่างด้านล่าง (การรวมพายุ)

. . .
food : 1
foodie : 2
burger : 1
. . .

Kafka Tool บรรจุภายใต้“ org.apache.kafka.tools. *. เครื่องมือแบ่งออกเป็นเครื่องมือระบบและเครื่องมือจำลองแบบ

เครื่องมือระบบ

เครื่องมือระบบสามารถเรียกใช้จากบรรทัดคำสั่งโดยใช้สคริปต์คลาสรัน ไวยากรณ์มีดังนี้ -

bin/kafka-run-class.sh package.class - - options

เครื่องมือระบบบางส่วนมีการกล่าวถึงด้านล่าง -

  • Kafka Migration Tool - เครื่องมือนี้ใช้ในการโยกย้ายนายหน้าจากเวอร์ชันหนึ่งไปยังอีกเวอร์ชันหนึ่ง

  • Mirror Maker - เครื่องมือนี้ใช้เพื่อทำการมิเรอร์คลัสเตอร์ Kafka หนึ่งไปยังอีกคลัสเตอร์

  • Consumer Offset Checker - เครื่องมือนี้แสดงกลุ่มผู้บริโภคหัวข้อพาร์ทิชันนอกสถานที่ล็อกขนาดเจ้าของสำหรับชุดหัวข้อและกลุ่มผู้บริโภคที่ระบุ

เครื่องมือจำลองแบบ

การจำลองแบบคาฟคาเป็นเครื่องมือการออกแบบระดับสูง วัตถุประสงค์ของการเพิ่มเครื่องมือจำลองแบบคือเพื่อความทนทานที่แข็งแกร่งและความพร้อมใช้งานที่สูงขึ้น เครื่องมือจำลองแบบบางส่วนมีการกล่าวถึงด้านล่าง -

  • Create Topic Tool - สิ่งนี้จะสร้างหัวข้อที่มีจำนวนพาร์ติชันเริ่มต้นปัจจัยการจำลองและใช้โครงร่างเริ่มต้นของ Kafka ในการกำหนดแบบจำลอง

  • List Topic Tool- เครื่องมือนี้แสดงข้อมูลสำหรับรายการหัวข้อที่กำหนด หากไม่มีหัวข้อในบรรทัดคำสั่งเครื่องมือจะสอบถาม Zookeeper เพื่อรับหัวข้อทั้งหมดและแสดงข้อมูลสำหรับหัวข้อเหล่านั้น ฟิลด์ที่เครื่องมือแสดง ได้แก่ ชื่อหัวข้อพาร์ติชันผู้นำแบบจำลอง isr

  • Add Partition Tool- การสร้างหัวข้อต้องระบุจำนวนพาร์ติชันสำหรับหัวข้อ ในภายหลังอาจจำเป็นต้องใช้พาร์ติชันเพิ่มเติมสำหรับหัวข้อเมื่อปริมาณของหัวข้อจะเพิ่มขึ้น เครื่องมือนี้ช่วยในการเพิ่มพาร์ติชันเพิ่มเติมสำหรับหัวข้อเฉพาะและยังช่วยให้การกำหนดแบบจำลองด้วยตนเองของพาร์ติชันที่เพิ่มเข้ามา

Kafka รองรับการใช้งานทางอุตสาหกรรมที่ดีที่สุดในปัจจุบันมากมาย เราจะให้ภาพรวมสั้น ๆ เกี่ยวกับแอปพลิเคชั่นที่โดดเด่นที่สุดของ Kafka ในบทนี้

ทวิตเตอร์

Twitter เป็นบริการเครือข่ายสังคมออนไลน์ที่มีแพลตฟอร์มในการส่งและรับทวีตของผู้ใช้ ผู้ใช้ที่ลงทะเบียนสามารถอ่านและโพสต์ทวีตได้ แต่ผู้ใช้ที่ไม่ได้ลงทะเบียนสามารถอ่านทวีตได้เท่านั้น Twitter ใช้ Storm-Kafka เป็นส่วนหนึ่งของโครงสร้างพื้นฐานการประมวลผลสตรีม

LinkedIn

Apache Kafka ใช้ที่ LinkedIn สำหรับข้อมูลสตรีมกิจกรรมและเมตริกการดำเนินงาน ระบบ Kafka mes-saging ช่วย LinkedIn ด้วยผลิตภัณฑ์ต่างๆเช่น LinkedIn Newsfeed, LinkedIn Today สำหรับการใช้ข้อความออนไลน์และนอกเหนือจากระบบการวิเคราะห์ออฟไลน์เช่น Hadoop ความทนทานที่แข็งแกร่งของ Kafka เป็นปัจจัยสำคัญอย่างหนึ่งในการเชื่อมต่อกับ LinkedIn

Netflix

Netflix เป็นผู้ให้บริการสตรีมมิ่งสื่ออินเทอร์เน็ตตามความต้องการข้ามชาติข้ามชาติ Netflix ใช้ Kafka สำหรับการตรวจสอบแบบเรียลไทม์และการประมวลผลเหตุการณ์

Mozilla

Mozilla เป็นชุมชนซอฟต์แวร์ฟรีที่สร้างขึ้นในปี 1998 โดยสมาชิกของ Netscape Kafka จะเข้ามาแทนที่ส่วนหนึ่งของระบบการผลิตในปัจจุบันของ Mozilla เพื่อรวบรวมข้อมูลประสิทธิภาพและการใช้งานจากเบราว์เซอร์ของผู้ใช้ปลายทางสำหรับโครงการต่างๆเช่น Telemetry, Test Pilot เป็นต้น

Oracle

Oracle ให้การเชื่อมต่อแบบเนทีฟกับ Kafka จากผลิตภัณฑ์ Enterprise Service Bus ที่เรียกว่า OSB (Oracle Service Bus) ซึ่งช่วยให้นักพัฒนาสามารถใช้ประโยชน์จากความสามารถในการไกล่เกลี่ยในตัว OSB เพื่อใช้ไปป์ไลน์ข้อมูลแบบทีละขั้น