Apache Kafka - คู่มือฉบับย่อ
ใน Big Data มีการใช้ข้อมูลจำนวนมหาศาล เกี่ยวกับข้อมูลเรามีความท้าทายหลักสองประการความท้าทายแรกคือวิธีรวบรวมข้อมูลจำนวนมากและความท้าทายประการที่สองคือการวิเคราะห์ข้อมูลที่รวบรวม เพื่อเอาชนะความท้าทายเหล่านั้นคุณต้องมีระบบส่งข้อความ
Kafka ออกแบบมาสำหรับระบบปริมาณงานสูงแบบกระจาย Kafka มีแนวโน้มที่จะทำงานได้ดีมากในฐานะตัวแทนนายหน้าข้อความแบบเดิม ๆ เมื่อเปรียบเทียบกับระบบการส่งข้อความอื่น Kafka มีทรูพุตที่ดีกว่าการแบ่งพาร์ติชันในตัวการจำลองแบบและการทนต่อความผิดพลาดโดยธรรมชาติซึ่งทำให้เหมาะสำหรับแอปพลิเคชันการประมวลผลข้อความขนาดใหญ่
ระบบส่งข้อความคืออะไร?
ระบบส่งข้อความมีหน้าที่ในการถ่ายโอนข้อมูลจากแอปพลิเคชันหนึ่งไปยังอีกแอปพลิเคชันดังนั้นแอปพลิเคชันจึงสามารถมุ่งเน้นไปที่ข้อมูลได้ แต่ไม่ต้องกังวลเกี่ยวกับวิธีการแบ่งปัน การส่งข้อความแบบกระจายจะขึ้นอยู่กับแนวคิดของการจัดคิวข้อความที่เชื่อถือได้ ข้อความถูกจัดคิวแบบอะซิงโครนัสระหว่างแอปพลิเคชันไคลเอ็นต์และระบบการส่งข้อความ รูปแบบการส่งข้อความมีให้เลือกสองแบบคือแบบชี้ต่อจุดและอีกแบบคือระบบการส่งข้อความเผยแพร่สมัครสมาชิก (pub-sub) รูปแบบการส่งข้อความส่วนใหญ่เป็นไปตามpub-sub.
ชี้ไปที่ระบบส่งข้อความจุด
ในระบบจุดต่อจุดข้อความจะยังคงอยู่ในคิว ผู้บริโภคอย่างน้อยหนึ่งรายสามารถใช้ข้อความในคิวได้ แต่ข้อความหนึ่ง ๆ สามารถใช้งานได้โดยผู้บริโภคสูงสุดหนึ่งรายเท่านั้น เมื่อผู้บริโภคอ่านข้อความในคิวข้อความนั้นจะหายไปจากคิวนั้น ตัวอย่างทั่วไปของระบบนี้คือระบบประมวลผลคำสั่งซึ่งแต่ละคำสั่งจะได้รับการประมวลผลโดยตัวประมวลผลคำสั่งเดียว แต่โปรเซสเซอร์หลายคำสั่งสามารถทำงานได้เช่นกันในเวลาเดียวกัน แผนภาพต่อไปนี้แสดงถึงโครงสร้าง
เผยแพร่ - สมัครรับข้อความระบบ
ในระบบเผยแพร่ - สมัครสมาชิกข้อความจะยังคงอยู่ในหัวข้อ แตกต่างจากระบบจุดต่อจุดผู้บริโภคสามารถสมัครสมาชิกหัวข้อหนึ่งหรือหลายหัวข้อและใช้ข้อความทั้งหมดในหัวข้อนั้น ในระบบการสมัครสมาชิกเผยแพร่ผู้ผลิตข้อความเรียกว่าผู้เผยแพร่และผู้บริโภคข้อความเรียกว่าสมาชิก ตัวอย่างในชีวิตจริงคือ Dish TV ซึ่งเผยแพร่ช่องต่างๆเช่นกีฬาภาพยนตร์เพลง ฯลฯ และทุกคนสามารถสมัครรับข้อมูลจากชุดช่องของตนเองและรับเมื่อใดก็ตามที่มีช่องที่สมัครรับข้อมูล
คาฟคาคืออะไร?
Apache Kafka เป็นระบบส่งข้อความแบบเผยแพร่ - สมัครสมาชิกแบบกระจายและคิวที่มีประสิทธิภาพซึ่งสามารถจัดการข้อมูลจำนวนมากและช่วยให้คุณสามารถส่งผ่านข้อความจากปลายทางหนึ่งไปยังอีกจุดหนึ่งได้ คาฟคาเหมาะสำหรับการใช้ข้อความทั้งออฟไลน์และออนไลน์ ข้อความ Kafka จะยังคงอยู่บนดิสก์และจำลองแบบภายในคลัสเตอร์เพื่อป้องกันข้อมูลสูญหาย Kafka สร้างขึ้นจากบริการซิงโครไนซ์ ZooKeeper ทำงานร่วมกับ Apache Storm และ Spark ได้เป็นอย่างดีสำหรับการวิเคราะห์ข้อมูลสตรีมมิ่งแบบเรียลไทม์
สิทธิประโยชน์
ต่อไปนี้เป็นประโยชน์บางประการของ Kafka -
Reliability - Kafka มีการแจกจ่ายแบ่งพาร์ติชันจำลองแบบและความทนทานต่อความผิดพลาด
Scalability - ระบบส่งข้อความ Kafka ปรับขนาดได้อย่างง่ายดายโดยไม่ต้องเสียเวลา ..
Durability- Kafka ใช้
บันทึกการกระทำแบบกระจาย
ซึ่งหมายความว่าข้อความยังคงอยู่บนดิสก์โดยเร็วที่สุดจึงมีความทนทาน ..Performance- Kafka มีปริมาณงานสูงสำหรับทั้งการเผยแพร่และการสมัครรับข้อความ รักษาประสิทธิภาพการทำงานที่เสถียรแม้จะเก็บข้อความไว้หลาย TB
Kafka รวดเร็วมากและรับประกันการหยุดทำงานเป็นศูนย์และข้อมูลสูญหายเป็นศูนย์
ใช้กรณี
Kafka สามารถใช้กับ Use Case ได้หลายแบบ บางส่วนมีการระบุไว้ด้านล่าง -
Metrics- คาฟคามักใช้สำหรับข้อมูลการตรวจสอบการปฏิบัติงาน สิ่งนี้เกี่ยวข้องกับการรวบรวมสถิติจากแอปพลิเคชันแบบกระจายเพื่อสร้างฟีดข้อมูลการดำเนินงานจากส่วนกลาง
Log Aggregation Solution - Kafka สามารถใช้งานได้ทั่วทั้งองค์กรเพื่อรวบรวมบันทึกจากบริการที่หลากหลายและทำให้สามารถใช้งานได้ในรูปแบบมาตรฐานสำหรับผู้ใช้หลายคน
Stream Processing- เฟรมเวิร์กยอดนิยมเช่น Storm และ Spark Streaming อ่านข้อมูลจากหัวข้อประมวลผลและเขียนข้อมูลที่ประมวลผลไปยังหัวข้อใหม่ซึ่งพร้อมใช้งานสำหรับผู้ใช้และแอปพลิเคชัน ความทนทานที่แข็งแกร่งของ Kafka ยังมีประโยชน์อย่างมากในบริบทของการประมวลผลสตรีม
ต้องการคาฟคา
Kafka เป็นแพลตฟอร์มแบบครบวงจรสำหรับจัดการฟีดข้อมูลแบบเรียลไทม์ทั้งหมด Kafka รองรับการส่งข้อความเวลาแฝงต่ำและรับประกันความทนทานต่อความผิดพลาดในกรณีที่เครื่องขัดข้อง มีความสามารถในการรองรับผู้บริโภคที่หลากหลายจำนวนมาก Kafka เร็วมากเขียน 2 ล้านครั้ง / วินาที Kafka ยังคงเก็บข้อมูลทั้งหมดไว้ในดิสก์ซึ่งโดยพื้นฐานแล้วหมายความว่าสิ่งที่เขียนทั้งหมดไปที่แคชของหน้าของ OS (RAM) ทำให้การถ่ายโอนข้อมูลจากแคชของเพจไปยังซ็อกเก็ตเครือข่ายมีประสิทธิภาพมาก
ก่อนที่จะก้าวเข้าสู่คาฟคาคุณต้องตระหนักถึงคำศัพท์หลัก ๆ เช่นหัวข้อนายหน้าผู้ผลิตและผู้บริโภค แผนภาพต่อไปนี้แสดงคำศัพท์หลักและตารางอธิบายส่วนประกอบของแผนภาพโดยละเอียด
ในแผนภาพด้านบนหัวข้อถูกกำหนดค่าเป็นสามพาร์ติชัน พาร์ติชั่น 1 มีปัจจัยออฟเซ็ต 2 ตัว 0 และ 1 พาร์ติชั่น 2 มีสี่ออฟเซ็ตแฟคเตอร์ 0, 1, 2 และ 3 พาร์ติชั่น 3 มีอ็อฟเซ็ตแฟคเตอร์ 0 หนึ่งไอดีของแบบจำลองจะเหมือนกับ id ของเซิร์ฟเวอร์ที่โฮสต์มัน
สมมติว่าหากตั้งค่าปัจจัยการจำลองแบบของหัวข้อเป็น 3 Kafka จะสร้างแบบจำลองที่เหมือนกัน 3 รายการของแต่ละพาร์ติชันและวางไว้ในคลัสเตอร์เพื่อให้พร้อมใช้งานสำหรับการดำเนินการทั้งหมด ในการปรับสมดุลภาระในคลัสเตอร์แต่ละโบรกเกอร์จะจัดเก็บพาร์ติชันเหล่านั้นอย่างน้อยหนึ่งพาร์ติชัน ผู้ผลิตและผู้บริโภคหลายรายสามารถเผยแพร่และเรียกดูข้อความได้ในเวลาเดียวกัน
ส. เลขที่ | ส่วนประกอบและคำอธิบาย |
---|---|
1 | Topics กระแสของข้อความที่อยู่ในหมวดหมู่หนึ่งเรียกว่าหัวข้อ ข้อมูลถูกจัดเก็บในหัวข้อ หัวข้อจะแบ่งออกเป็นพาร์ติชัน สำหรับแต่ละหัวข้อ Kafka จะเก็บมินิมัมไว้หนึ่งพาร์ติชัน แต่ละพาร์ติชันดังกล่าวมีข้อความตามลำดับที่ไม่เปลี่ยนรูป พาร์ติชันถูกใช้งานเป็นชุดของไฟล์เซ็กเมนต์ที่มีขนาดเท่ากัน |
2 | Partition หัวข้ออาจมีหลายพาร์ติชั่นดังนั้นจึงสามารถจัดการกับข้อมูลได้ตามอำเภอใจ |
3 | Partition offset แต่ละข้อความแบ่งพาร์ติชันที่มี ID ลำดับไม่ซ้ำกันเรียกว่าเป็นชดเชย |
4 | Replicas of partition Replicas ไม่ใช่อะไรนอกจากการ |
5 | Brokers
|
6 | Kafka Cluster คาฟคามีนายหน้ามากกว่าหนึ่งรายเรียกว่าคลัสเตอร์คาฟคา คลัสเตอร์ Kafka สามารถขยายได้โดยไม่ต้องหยุดทำงาน คลัสเตอร์เหล่านี้ใช้เพื่อจัดการการคงอยู่และการจำลองข้อมูลข้อความ |
7 | Producers ผู้ผลิตเป็นผู้เผยแพร่ข้อความไปยังหัวข้อ Kafka อย่างน้อยหนึ่งหัวข้อ ผู้ผลิตส่งข้อมูลไปยังโบรกเกอร์คาฟคา ทุกครั้งที่ผู้ผลิตเผยแพร่ข้อความถึงนายหน้านายหน้าจะต่อท้ายข้อความไว้ในไฟล์ส่วนสุดท้าย จริงๆแล้วข้อความจะถูกต่อท้ายพาร์ติชัน Producer ยังสามารถส่งข้อความไปยังพาร์ติชันที่ต้องการได้ |
8 | Consumers ผู้บริโภคอ่านข้อมูลจากโบรกเกอร์ ผู้บริโภคสมัครรับข่าวสารตั้งแต่หนึ่งหัวข้อขึ้นไปและใช้ข้อความที่เผยแพร่โดยดึงข้อมูลจากโบรกเกอร์ |
9 | Leader
|
10 | Follower โหนดซึ่งเป็นไปตามคำแนะนำของผู้นำเรียกว่าเป็นผู้ตาม หากผู้นำล้มเหลวผู้ตามคนใดคนหนึ่งจะกลายเป็นผู้นำคนใหม่โดยอัตโนมัติ ผู้ติดตามทำหน้าที่เป็นผู้บริโภคทั่วไปดึงข้อความและอัปเดตที่เก็บข้อมูลของตนเอง |
ดูภาพประกอบต่อไปนี้ แสดงแผนภาพคลัสเตอร์ของคาฟคา
ตารางต่อไปนี้อธิบายส่วนประกอบแต่ละอย่างที่แสดงในแผนภาพด้านบน
ส. เลขที่ | ส่วนประกอบและคำอธิบาย |
---|---|
1 | Broker โดยทั่วไปคลัสเตอร์ Kafka ประกอบด้วยโบรกเกอร์หลายรายเพื่อรักษาสมดุลของภาระงาน โบรกเกอร์ Kafka ไร้สัญชาติดังนั้นพวกเขาจึงใช้ ZooKeeper เพื่อรักษาสถานะคลัสเตอร์ของตน อินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการสามารถรองรับการอ่านและเขียนได้หลายแสนครั้งต่อวินาทีและ bro-ker แต่ละรายสามารถจัดการข้อความ TB ได้โดยไม่ส่งผลกระทบต่อประสิทธิภาพ การเลือกตั้งหัวหน้านายหน้าคาฟคาสามารถทำได้โดย ZooKeeper |
2 | ZooKeeper ZooKeeper ใช้สำหรับจัดการและประสานงานนายหน้าคาฟคา บริการ ZooKeeper ส่วนใหญ่จะใช้เพื่อแจ้งให้ผู้ผลิตและผู้บริโภคทราบเกี่ยวกับการมีนายหน้าใหม่ในระบบ Kafka หรือความล้มเหลวของนายหน้าในระบบ Kafka ตามการแจ้งเตือนที่ได้รับจาก Zookeeper เกี่ยวกับการมีอยู่หรือความล้มเหลวของนายหน้าจากนั้นโปรดูเซอร์และผู้บริโภคจะตัดสินใจและเริ่มประสานงานกับนายหน้ารายอื่น |
3 | Producers ผู้ผลิตส่งข้อมูลไปยังโบรกเกอร์ เมื่อนายหน้าใหม่เริ่มต้นผู้ผลิตทั้งหมดจะค้นหาและส่งข้อความไปยังนายหน้าใหม่นั้นโดยอัตโนมัติ ผู้ผลิต Kafka ไม่รอการตอบรับจากนายหน้าและส่งข้อความให้เร็วที่สุดเท่าที่นายหน้าจะจัดการได้ |
4 | Consumers เนื่องจากโบรกเกอร์ Kafka เป็นคนไร้สัญชาติซึ่งหมายความว่าผู้บริโภคต้องรักษาจำนวนข้อความที่ใช้ไปโดยใช้พาร์ติชันออฟเซ็ต หากผู้บริโภครับทราบการชดเชยข้อความใดข้อความหนึ่งแสดงว่าผู้บริโภคใช้ข้อความก่อนหน้านี้หมดแล้ว ผู้บริโภคส่งคำขอดึงแบบอะซิงโครนัสไปยังโบรกเกอร์เพื่อให้มีบัฟเฟอร์ของไบต์ที่พร้อมใช้งาน ผู้บริโภคสามารถย้อนกลับหรือข้ามไปยังจุดใดก็ได้ในพาร์ติชันเพียงแค่ใส่ค่าชดเชย ค่าชดเชยของผู้บริโภคจะแจ้งโดย ZooKeeper |
ณ ตอนนี้เราได้กล่าวถึงแนวคิดหลักของคาฟคา ตอนนี้ให้เราแสดงความคิดเห็นเกี่ยวกับขั้นตอนการทำงานของ Kafka
Kafka เป็นเพียงชุดของหัวข้อที่แบ่งออกเป็นพาร์ติชันหนึ่งหรือหลายพาร์ติชัน พาร์ติชัน Kafka คือลำดับของข้อความที่เรียงตามลำดับเชิงเส้นโดยแต่ละข้อความจะถูกระบุโดยดัชนีของพวกเขา (เรียกว่าออฟเซ็ต) ข้อมูลทั้งหมดในคลัสเตอร์ Kafka คือการรวมกันของพาร์ติชันที่ไม่ปะติดปะต่อกัน ข้อความขาเข้าจะถูกเขียนไว้ที่ส่วนท้ายของพาร์ติชันและข้อความจะถูกอ่านโดยผู้บริโภคตามลำดับ ความทนทานมีให้โดยการจำลองข้อความไปยังโบรกเกอร์ต่างๆ
Kafka ให้บริการทั้งระบบส่งข้อความ pub-sub และตามคิวในลักษณะที่รวดเร็วเชื่อถือได้คงอยู่ทนต่อความผิดพลาดและไม่มีเวลาหยุดทำงาน ในทั้งสองกรณีผู้ผลิตเพียงแค่ส่งข้อความไปยังหัวข้อและผู้บริโภคสามารถเลือกระบบการส่งข้อความประเภทใดก็ได้ขึ้นอยู่กับความต้องการของพวกเขา ให้เราทำตามขั้นตอนในหัวข้อถัดไปเพื่อทำความเข้าใจว่าผู้บริโภคสามารถเลือกระบบการส่งข้อความที่ต้องการได้อย่างไร
เวิร์กโฟลว์ของ Pub-Sub Messaging
ต่อไปนี้เป็นขั้นตอนการทำงานที่ชาญฉลาดของ Pub-Sub Messaging -
ผู้ผลิตส่งข้อความถึงหัวข้อในช่วงเวลาปกติ
โบรกเกอร์ Kafka เก็บข้อความทั้งหมดในพาร์ติชันที่กำหนดค่าไว้สำหรับหัวข้อนั้น ๆ เพื่อให้แน่ใจว่าข้อความจะถูกแบ่งใช้อย่างเท่าเทียมกันระหว่างพาร์ติชัน หากผู้ผลิตส่งข้อความสองข้อความและมีสองพาร์ติชัน Kafka จะเก็บหนึ่งข้อความในพาร์ติชันแรกและข้อความที่สองในพาร์ติชันที่สอง
ผู้บริโภคสมัครรับหัวข้อเฉพาะ
เมื่อผู้บริโภคสมัครรับหัวข้อแล้ว Kafka จะให้ค่าชดเชยปัจจุบันของหัวข้อแก่ผู้บริโภคและยังบันทึกค่าชดเชยในชุด Zookeeper
ผู้บริโภคจะร้องขอ Kafka ในช่วงเวลาปกติ (เช่น 100 Ms) สำหรับข้อความใหม่
เมื่อคาฟคาได้รับข้อความจากผู้ผลิตระบบจะส่งต่อข้อความเหล่านี้ไปยังผู้บริโภค
ผู้บริโภคจะได้รับข้อความและดำเนินการ
เมื่อข้อความได้รับการประมวลผลผู้บริโภคจะส่งการตอบรับไปยังนายหน้าคาฟคา
เมื่อ Kafka ได้รับการตอบรับแล้วจะเปลี่ยนค่าชดเชยเป็นค่าใหม่และอัปเดตใน Zookeeper เนื่องจากการชดเชยยังคงอยู่ใน Zookeeper ผู้บริโภคจึงสามารถอ่านข้อความถัดไปได้อย่างถูกต้องแม้ในช่วงที่เซิร์ฟเวอร์หยุดทำงาน
ขั้นตอนข้างต้นนี้จะทำซ้ำจนกว่าผู้บริโภคจะหยุดคำขอ
ผู้บริโภคมีตัวเลือกในการกรอกลับ / ข้ามไปยังออฟเซ็ตที่ต้องการได้ตลอดเวลาและอ่านข้อความที่ตามมาทั้งหมด
เวิร์กโฟลว์ของ Queue Messaging / Consumer Group
ในระบบส่งข้อความคิวแทนที่จะเป็นผู้บริโภครายเดียวกลุ่มผู้บริโภคที่มีรหัสกลุ่ม
เดียวกันจะสมัครรับหัวข้อ พูดง่ายๆคือผู้บริโภคที่สมัครรับหัวข้อด้วยGroup ID
เดียวกันถือเป็นกลุ่มเดียวและมีการแชร์ข้อความระหว่างกัน ให้เราตรวจสอบขั้นตอนการทำงานจริงของระบบนี้
ผู้ผลิตส่งข้อความถึงหัวข้อในช่วงเวลาปกติ
Kafka จัดเก็บข้อความทั้งหมดในพาร์ติชันที่กำหนดค่าไว้สำหรับหัวข้อนั้น ๆ ซึ่งคล้ายกับสถานการณ์ก่อนหน้านี้
ผู้บริโภคเดียวเป็นสมาชิกกับหัวข้อที่เฉพาะเจาะจงเช่นสมมติ
กระทู้-01
กับรหัสกลุ่ม
เป็นกลุ่มที่
1ปฏิสัมพันธ์ Kafka กับผู้บริโภคในลักษณะเดียวกับผับ-Sub Messaging จนผู้บริโภคใหม่สมัครหัวข้อเดียวกัน
กระทู้-01
แบบเดียวกับที่ID กลุ่ม
เป็นกลุ่มที่
1เมื่อผู้บริโภครายใหม่มาถึง Kafka จะเปลี่ยนการทำงานเป็นโหมดแบ่งปันและแบ่งปันข้อมูลระหว่างผู้บริโภคทั้งสอง การแบ่งปันนี้จะดำเนินต่อไปจนกว่าจำนวนผู้ร่วมก่อเหตุจะถึงจำนวนพาร์ติชันที่กำหนดค่าไว้สำหรับหัวข้อนั้น ๆ
เมื่อจำนวนผู้บริโภคเกินจำนวนพาร์ติชันผู้ใช้รายใหม่จะไม่ได้รับข้อความใด ๆ อีกจนกว่าผู้บริโภคที่มีอยู่จะยกเลิกการเป็นสมาชิก สถานการณ์นี้เกิดขึ้นเนื่องจากผู้บริโภคแต่ละรายใน Kafka จะได้รับมอบหมายอย่างน้อยหนึ่งพาร์ติชันและเมื่อกำหนดพาร์ติชันทั้งหมดให้กับผู้บริโภคที่มีอยู่แล้วผู้บริโภครายใหม่จะต้องรอ
คุณลักษณะนี้จะเรียกว่าเป็นกลุ่มผู้บริโภค
บทบาทของ ZooKeeper
การพึ่งพาที่สำคัญของ Apache Kafka คือ Apache Zookeeper ซึ่งเป็นบริการกำหนดค่าและซิงโครไนซ์แบบกระจาย Zookeeper ทำหน้าที่เป็นอินเทอร์เฟซการประสานงานระหว่างโบรกเกอร์ Kafka และผู้บริโภค เซิร์ฟเวอร์ Kafka แบ่งปันข้อมูลผ่านคลัสเตอร์ Zookeeper Kafka จัดเก็บข้อมูลเมตาพื้นฐานใน Zookeeper เช่นข้อมูลเกี่ยวกับหัวข้อนายหน้าการชดเชยผู้บริโภค (ตัวอ่านคิว) เป็นต้น
เนื่องจากข้อมูลที่สำคัญทั้งหมดจะถูกเก็บไว้ใน Zookeeper และโดยปกติจะจำลองข้อมูลนี้ในทั้งชุดความล้มเหลวของนายหน้า Kafka / Zookeeper จึงไม่ส่งผลกระทบต่อสถานะของคลัสเตอร์ Kafka คาฟคาจะคืนสถานะเมื่อ Zookeeper รีสตาร์ท ทำให้ Kafka หยุดทำงานเป็นศูนย์ การเลือกตั้งผู้นำระหว่างนายหน้าคาฟคายังทำได้โดยใช้ Zookeeper ในกรณีที่ผู้นำล้มเหลว
ต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการดูแลสัตว์โปรดดูZookeeper
ให้เราดำเนินการเพิ่มเติมเกี่ยวกับวิธีการติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณในบทถัดไป
ต่อไปนี้เป็นขั้นตอนในการติดตั้ง Java บนเครื่องของคุณ
ขั้นตอนที่ 1 - ตรวจสอบการติดตั้ง Java
หวังว่าคุณได้ติดตั้ง java บนเครื่องของคุณแล้วดังนั้นคุณเพียงแค่ยืนยันโดยใช้คำสั่งต่อไปนี้
$ java -version
หากติดตั้ง java บนเครื่องของคุณสำเร็จคุณจะเห็นเวอร์ชันของ Java ที่ติดตั้ง
ขั้นตอนที่ 1.1 - ดาวน์โหลด JDK
หากไม่ได้ดาวน์โหลด Java โปรดดาวน์โหลด JDK เวอร์ชันล่าสุดโดยไปที่ลิงค์ต่อไปนี้และดาวน์โหลดเวอร์ชันล่าสุด
http://www.oracle.com/technetwork/java/javase/downloads/index.htmlตอนนี้เวอร์ชันล่าสุดคือ JDK 8u 60 และไฟล์คือ“ jdk-8u60-linux-x64.tar.gz” โปรดดาวน์โหลดไฟล์บนเครื่องของคุณ
ขั้นตอนที่ 1.2 - แตกไฟล์
โดยทั่วไปไฟล์ที่กำลังดาวน์โหลดจะถูกเก็บไว้ในโฟลเดอร์ดาวน์โหลดตรวจสอบและแยกการตั้งค่า tar โดยใช้คำสั่งต่อไปนี้
$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz
ขั้นตอนที่ 1.3 - ย้ายไปที่ Opt Directory
ในการทำให้ java พร้อมใช้งานสำหรับผู้ใช้ทั้งหมดให้ย้ายเนื้อหา java ที่แยกแล้วไปยังusr / local / java
/ folder
$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/
ขั้นตอนที่ 1.4 - กำหนดเส้นทาง
ในการกำหนดเส้นทางและตัวแปร JAVA_HOME ให้เพิ่มคำสั่งต่อไปนี้ในไฟล์ ~ / .bashrc
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
ตอนนี้ใช้การเปลี่ยนแปลงทั้งหมดในระบบที่กำลังทำงานอยู่
$ source ~/.bashrc
ขั้นตอนที่ 1.5 - ทางเลือก Java
ใช้คำสั่งต่อไปนี้เพื่อเปลี่ยน Java Alternatives
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
Step 1.6 - ตอนนี้ตรวจสอบ java โดยใช้คำสั่งการตรวจสอบ (java -version) ที่อธิบายไว้ในขั้นตอนที่ 1
ขั้นตอนที่ 2 - การติดตั้ง ZooKeeper Framework
ขั้นตอนที่ 2.1 - ดาวน์โหลด ZooKeeper
ในการติดตั้ง ZooKeeper framework บนเครื่องของคุณให้ไปที่ลิงค์ต่อไปนี้และดาวน์โหลด ZooKeeper เวอร์ชันล่าสุด
http://zookeeper.apache.org/releases.htmlณ ตอนนี้ ZooKeeper เวอร์ชันล่าสุดคือ 3.4.6 (ZooKeeper-3.4.6.tar.gz)
ขั้นตอนที่ 2.2 - แตกไฟล์ tar
แตกไฟล์ tar โดยใช้คำสั่งต่อไปนี้
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data
ขั้นตอนที่ 2.3 - สร้างไฟล์กำหนดค่า
เปิดไฟล์คอนฟิกูเรชันชื่อconf / zoo.cfg
โดยใช้คำสั่ง vi“ conf / zoo.cfg” และพารามิเตอร์ต่อไปนี้ทั้งหมดเพื่อตั้งเป็นจุดเริ่มต้น
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
เมื่อบันทึกไฟล์คอนฟิกูเรชันเรียบร้อยแล้วและกลับไปที่เทอร์มินัลอีกครั้งคุณสามารถเริ่มเซิร์ฟเวอร์ Zookeeper ได้
ขั้นตอนที่ 2.4 - เริ่มเซิร์ฟเวอร์ ZooKeeper
$ bin/zkServer.sh start
หลังจากดำเนินการคำสั่งนี้คุณจะได้รับคำตอบตามที่แสดงด้านล่าง -
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED
ขั้นตอนที่ 2.5 - เริ่ม CLI
$ bin/zkCli.sh
หลังจากพิมพ์คำสั่งด้านบนคุณจะเชื่อมต่อกับเซิร์ฟเวอร์ Zookeeper และจะได้รับคำตอบด้านล่าง
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
ขั้นตอนที่ 2.6 - หยุดเซิร์ฟเวอร์ Zookeeper
หลังจากเชื่อมต่อเซิร์ฟเวอร์และดำเนินการทั้งหมดแล้วคุณสามารถหยุดเซิร์ฟเวอร์ Zookeeper ได้ด้วยคำสั่งต่อไปนี้ -
$ bin/zkServer.sh stop
ตอนนี้คุณได้ติดตั้ง Java และ ZooKeeper บนเครื่องของคุณเรียบร้อยแล้ว ให้เราดูขั้นตอนในการติดตั้ง Apache Kafka
ขั้นตอนที่ 3 - การติดตั้ง Apache Kafka
ให้เราทำตามขั้นตอนต่อไปนี้เพื่อติดตั้ง Kafka บนเครื่องของคุณ
ขั้นตอนที่ 3.1 - ดาวน์โหลด Kafka
ในการติดตั้ง Kafka บนเครื่องของคุณคลิกที่ลิงค์ด้านล่าง -
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgzตอนนี้เวอร์ชันล่าสุดคือ - kafka_2.11_0.9.0.0.tgz จะถูกดาวน์โหลดลงในเครื่องของคุณ
ขั้นตอนที่ 3.2 - แตกไฟล์ tar
แตกไฟล์ tar โดยใช้คำสั่งต่อไปนี้ -
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0
ตอนนี้คุณได้ดาวน์โหลด Kafka เวอร์ชันล่าสุดบนเครื่องของคุณแล้ว
ขั้นตอนที่ 3.3 - เริ่มเซิร์ฟเวอร์
คุณสามารถเริ่มต้นเซิร์ฟเวอร์โดยให้คำสั่งต่อไปนี้ -
$ bin/kafka-server-start.sh config/server.properties
หลังจากเซิร์ฟเวอร์เริ่มทำงานคุณจะเห็นคำตอบด้านล่างบนหน้าจอของคุณ -
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….
ขั้นตอนที่ 4 - หยุดเซิร์ฟเวอร์
หลังจากดำเนินการทั้งหมดแล้วคุณสามารถหยุดเซิร์ฟเวอร์ได้โดยใช้คำสั่งต่อไปนี้ -
$ bin/kafka-server-stop.sh config/server.properties
ตอนนี้เราได้พูดคุยเกี่ยวกับการติดตั้ง Kafka แล้วเราสามารถเรียนรู้วิธีดำเนินการพื้นฐานบน Kafka ได้ในบทถัดไป
ขั้นแรกให้เราเริ่มใช้การกำหนดค่าโบรกเกอร์โหนดเดียว
จากนั้นเราจะย้ายการตั้งค่าของเราไปยังการกำหนดค่าโบรกเกอร์หลายโหนดเดียว
หวังว่าตอนนี้คุณจะติดตั้ง Java, ZooKeeper และ Kafka บนเครื่องของคุณแล้ว ก่อนที่จะย้ายไปที่ Kafka Cluster Setup ก่อนอื่นคุณต้องเริ่ม ZooKeeper ของคุณเนื่องจาก Kafka Cluster ใช้ ZooKeeper
เริ่ม ZooKeeper
เปิดเทอร์มินัลใหม่และพิมพ์คำสั่งต่อไปนี้ -
bin/zookeeper-server-start.sh config/zookeeper.properties
ในการเริ่ม Kafka Broker ให้พิมพ์คำสั่งต่อไปนี้ -
bin/kafka-server-start.sh config/server.properties
หลังจากเริ่ม Kafka Broker พิมพ์คำสั่งjps
บนเทอร์มินัล ZooKeeper และคุณจะเห็นคำตอบต่อไปนี้ -
821 QuorumPeerMain
928 Kafka
931 Jps
ตอนนี้คุณสามารถเห็นภูตสองตัวที่ทำงานบนเทอร์มินัลโดยที่ QuorumPeerMain คือ ZooKeeper daemon และอีกตัวคือ Kafka daemon
การกำหนดค่าโบรกเกอร์โหนดเดียว - เดี่ยว
ในการกำหนดค่านี้คุณมีอินสแตนซ์ ZooKeeper และรหัสนายหน้าเพียงชุดเดียว ต่อไปนี้เป็นขั้นตอนในการกำหนดค่า -
Creating a Kafka Topic- Kafka มียูทิลิตี้บรรทัดคำสั่งชื่อkafka-topics.sh
เพื่อสร้างหัวข้อบนเซิร์ฟเวอร์ เปิดเทอร์มินัลใหม่และพิมพ์ตัวอย่างด้านล่าง
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
เราเพิ่งสร้างหัวข้อชื่อHello-Kafka
ด้วยพาร์ติชันเดียวและตัวประกอบการจำลองหนึ่งตัว ผลลัพธ์ที่สร้างขึ้นข้างต้นจะคล้ายกับผลลัพธ์ต่อไปนี้ -
Output- สร้างหัวข้อHello-Kafka
เมื่อสร้างหัวข้อแล้วคุณจะได้รับการแจ้งเตือนในหน้าต่างเทอร์มินัลโบรกเกอร์ Kafka และบันทึกสำหรับหัวข้อที่สร้างขึ้นซึ่งระบุไว้ใน“ / tmp / kafka-logs /“ ในไฟล์ config / server.properties
รายการหัวข้อ
ในการรับรายการหัวข้อในเซิร์ฟเวอร์ Kafka คุณสามารถใช้คำสั่งต่อไปนี้ -
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
เนื่องจากเราได้สร้างหัวข้อจึงจะแสดงเฉพาะHello-Kafka
เท่านั้น สมมติว่าหากคุณสร้างมากกว่าหนึ่งหัวข้อคุณจะได้รับชื่อหัวข้อในผลลัพธ์
เริ่ม Producer เพื่อส่งข้อความ
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
จากไวยากรณ์ข้างต้นจำเป็นต้องมีพารามิเตอร์หลักสองตัวสำหรับไคลเอนต์บรรทัดคำสั่งผู้ผลิต -
Broker-list- รายชื่อโบรกเกอร์ที่เราต้องการส่งข้อความไป ในกรณีนี้เรามีนายหน้าเพียงรายเดียว ไฟล์ Config / server.properties มี id พอร์ตโบรกเกอร์เนื่องจากเราทราบว่าโบรกเกอร์ของเรากำลังฟังพอร์ต 9092 ดังนั้นคุณจึงสามารถระบุได้โดยตรง
ชื่อหัวข้อ - นี่คือตัวอย่างสำหรับชื่อหัวข้อ
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
โปรดิวเซอร์จะรอข้อมูลจาก stdin และเผยแพร่ไปยังคลัสเตอร์ Kafka โดยดีฟอลต์ทุกบรรทัดใหม่จะถูกเผยแพร่เป็นข้อความใหม่จากนั้นคุณสมบัติของผู้ผลิตเริ่มต้นจะถูกระบุในไฟล์config / producer.properties
ตอนนี้คุณสามารถพิมพ์ข้อความสองสามบรรทัดในเทอร์มินัลดังที่แสดงด้านล่าง
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
เริ่มให้ผู้บริโภครับข้อความ
คล้ายกับการผลิตคุณสมบัติของผู้บริโภคเริ่มต้นที่ระบุไว้ในconfig / consumer.proper สัมพันธ์
ไฟล์ เปิดเทอร์มินัลใหม่และพิมพ์ไวยากรณ์ด้านล่างเพื่อใช้ข้อความ
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
สุดท้ายคุณสามารถป้อนข้อความจากเทอร์มินัลของผู้ผลิตและเห็นข้อความเหล่านี้ปรากฏในเทอร์มินัลของผู้บริโภค ณ ตอนนี้คุณมีความเข้าใจเป็นอย่างดีเกี่ยวกับคลัสเตอร์โหนดเดียวกับโบรกเกอร์เดียว ให้เราไปยังการกำหนดค่าโบรกเกอร์หลายรายการ
การกำหนดค่าโบรกเกอร์หลายโหนดเดียว
ก่อนที่จะย้ายไปยังการตั้งค่าคลัสเตอร์โบรกเกอร์หลายรายการให้เริ่มเซิร์ฟเวอร์ ZooKeeper ของคุณก่อน
Create Multiple Kafka Brokers- เรามีอินสแตนซ์โบรกเกอร์ Kafka หนึ่งรายการอยู่แล้วใน con-fig / server.properties ตอนนี้เราต้องการอินสแตนซ์โบรกเกอร์หลายอินสแตนซ์ดังนั้นให้คัดลอกไฟล์ server.prop-erties ที่มีอยู่ลงในไฟล์ config ใหม่สองไฟล์และเปลี่ยนชื่อเป็น server-one.properties และ server-two.prop-erties จากนั้นแก้ไขไฟล์ใหม่ทั้งสองไฟล์และกำหนดการเปลี่ยนแปลงต่อไปนี้ -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers- หลังจากทำการเปลี่ยนแปลงทั้งหมดบนเซิร์ฟเวอร์สามเครื่องแล้วให้เปิดเทอร์มินัลใหม่สามเทอร์มินัลเพื่อเริ่มโบรกเกอร์ทีละรายการ
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
ตอนนี้เรามีโบรกเกอร์สามแห่งที่ทำงานบนเครื่อง ลองด้วยตัวเองเพื่อตรวจสอบภูตทั้งหมดโดยพิมพ์jps บนเทอร์มินัล ZooKeeper จากนั้นคุณจะเห็นการตอบสนอง
การสร้างหัวข้อ
ให้เรากำหนดค่าปัจจัยการจำลองเป็นสามสำหรับหัวข้อนี้เนื่องจากเรามีโบรกเกอร์สามแห่งที่ทำงานอยู่ หากคุณมีโบรกเกอร์สองรายค่าแบบจำลองที่กำหนดจะเป็นสอง
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
อธิบาย
คำสั่งที่ใช้ในการตรวจสอบซึ่งนายหน้าจะฟังในหัวข้อสร้างขึ้นในปัจจุบันที่แสดงด้านล่าง -
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
จากผลลัพธ์ข้างต้นเราสามารถสรุปได้ว่าบรรทัดแรกให้ข้อมูลสรุปของพาร์ติชันทั้งหมดแสดงชื่อหัวข้อจำนวนพาร์ติชันและปัจจัยการจำลองแบบที่เราได้เลือกไว้แล้ว ในบรรทัดที่สองแต่ละโหนดจะเป็นผู้นำสำหรับส่วนที่สุ่มเลือกของพาร์ติชัน
ในกรณีของเราเราเห็นว่าโบรกเกอร์รายแรกของเรา (ที่มีโบรกเกอร์ id 0) เป็นผู้นำ จากนั้น Replicas: 0,2,1 หมายความว่าโบรกเกอร์ทั้งหมดทำซ้ำหัวข้อในที่สุดIsr
ก็คือชุดของแบบจำลองในการซิงค์
นี่คือส่วนย่อยของแบบจำลองที่มีชีวิตอยู่ในขณะนี้และถูกติดตามโดยผู้นำ
เริ่ม Producer เพื่อส่งข้อความ
ขั้นตอนนี้ยังคงเหมือนกับในการตั้งค่านายหน้าเดี่ยว
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
เริ่มให้ผู้บริโภครับข้อความ
ขั้นตอนนี้ยังคงเหมือนเดิมตามที่แสดงในการตั้งค่านายหน้าเดี่ยว
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
การใช้งานหัวข้อพื้นฐาน
ในบทนี้เราจะพูดถึงการดำเนินงานของหัวข้อพื้นฐานต่างๆ
การแก้ไขหัวข้อ
ดังที่คุณได้เข้าใจวิธีสร้างหัวข้อใน Kafka Cluster แล้ว ตอนนี้ให้เราแก้ไขหัวข้อที่สร้างขึ้นโดยใช้คำสั่งต่อไปนี้
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
การลบหัวข้อ
ในการลบหัวข้อคุณสามารถใช้ไวยากรณ์ต่อไปนี้
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −สิ่งนี้จะไม่มีผลกระทบหาก delete.topic.enable ไม่ได้ตั้งค่าเป็นจริง
ให้เราสร้างแอปพลิเคชันสำหรับเผยแพร่และใช้งานข้อความโดยใช้ไคลเอ็นต์ Java ไคลเอ็นต์ผู้ผลิต Kafka ประกอบด้วย API ต่อไปนี้
KafkaProducer API
ให้เราเข้าใจชุดที่สำคัญที่สุดของ Kafka Producer API ในส่วนนี้ ส่วนกลางของ KafkaProducer API คือคลาสKafkaProducer
คลาส KafkaProducer มีตัวเลือกในการเชื่อมต่อโบรกเกอร์ Kafka ในคอนสตรัคเตอร์ด้วยวิธีการดังต่อไปนี้
คลาส KafkaProducer จัดเตรียมวิธีการส่งเพื่อส่งข้อความไปยังหัวข้อแบบอะซิงโครนัส ลายเซ็นของ send () มีดังต่อไปนี้
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
ProducerRecord - ผู้ผลิตจัดการบัฟเฟอร์ของเร็กคอร์ดที่รอการส่ง
Callback - การโทรกลับที่ผู้ใช้จัดหาเพื่อดำเนินการเมื่อเซิร์ฟเวอร์ได้รับการยอมรับว่าบันทึกได้รับการปรับปรุง (null หมายถึงไม่มีการโทรกลับ)
คลาส KafkaProducer มีวิธีการล้างเพื่อให้แน่ใจว่าข้อความที่ส่งไปก่อนหน้านี้เสร็จสมบูรณ์แล้วจริงๆ ไวยากรณ์ของวิธีการล้างมีดังนี้ -
public void flush()
คลาส KafkaProducer จัดเตรียมเมธอด partitionFor ซึ่งช่วยในการรับข้อมูลเมตาของพาร์ติชันสำหรับหัวข้อที่กำหนด สามารถใช้สำหรับการแบ่งพาร์ติชันแบบกำหนดเอง ลายเซ็นของวิธีนี้มีดังนี้ -
public Map metrics()
ส่งคืนแผนที่ของเมตริกภายในที่ดูแลโดยผู้สร้าง
public void close () - คลาส KafkaProducer จัดเตรียมบล็อกวิธีการปิดจนกว่าคำขอที่ส่งก่อนหน้านี้ทั้งหมดจะเสร็จสมบูรณ์
Producer API
ส่วนกลางของ Producer API คือคลาสProducer
คลาส Producer มีตัวเลือกในการเชื่อมต่อโบรกเกอร์ Kafka ในตัวสร้างโดยวิธีการต่อไปนี้
คลาส Producer
คลาสผู้ผลิตจัดเตรียมวิธีการส่งไปยัง send ข้อความไปยังหัวข้อเดียวหรือหลายหัวข้อโดยใช้ลายเซ็นต่อไปนี้
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
ผู้ผลิตมีสองประเภท - Sync และ Async.
การกำหนดค่า API เดียวกันใช้กับผู้ผลิตSync
เช่นกัน ความแตกต่างระหว่างพวกเขาคือผู้ผลิตซิงค์ส่งข้อความโดยตรง แต่ส่งข้อความในพื้นหลัง ผู้ผลิต Async เป็นที่ต้องการเมื่อคุณต้องการทรูพุตที่สูงขึ้น ในรีลีสก่อนหน้านี้เช่น 0.8 ผู้ผลิต async ไม่มีการเรียกกลับสำหรับ send () เพื่อลงทะเบียนตัวจัดการข้อผิดพลาด สิ่งนี้มีให้เฉพาะในรุ่นปัจจุบัน 0.9
โมฆะสาธารณะปิด ()
คลาส Producer ให้ close วิธีปิดการเชื่อมต่อพูลผู้ผลิตกับ Kafka bro-kers ทั้งหมด
การตั้งค่าการกำหนดค่า
การตั้งค่าคอนฟิกูเรชันหลักของ Producer API แสดงอยู่ในตารางต่อไปนี้เพื่อให้อยู่ในสถานะต่ำกว่า -
ส. เลขที่ | การตั้งค่าการกำหนดค่าและคำอธิบาย |
---|---|
1 | client.id ระบุแอปพลิเคชันผู้ผลิต |
2 | producer.type ไม่ว่าจะซิงค์หรือไม่ซิงค์ |
3 | acks การกำหนดค่า acks ควบคุมเกณฑ์ภายใต้คำร้องขอของผู้ผลิตนั้นเป็นไปตามความสมบูรณ์ |
4 | retries หากคำขอโปรดิวเซอร์ล้มเหลวให้ลองใหม่โดยอัตโนมัติด้วยค่าเฉพาะ |
5 | bootstrap.servers bootstrapping รายชื่อโบรกเกอร์ |
6 | linger.ms หากคุณต้องการลดจำนวนคำขอคุณสามารถตั้งค่า linger.ms เป็นค่าที่มากกว่าค่าบางค่าได้ |
7 | key.serializer คีย์สำหรับอินเทอร์เฟซ Serializer |
8 | value.serializer ค่าสำหรับอินเทอร์เฟซ Serializer |
9 | batch.size ขนาดบัฟเฟอร์ |
10 | buffer.memory ควบคุมจำนวนหน่วยความจำทั้งหมดที่พร้อมใช้งานสำหรับผู้สร้างสำหรับการบัฟเฟอร์ |
ProducerRecord API
ProducerRecord เป็นคู่คีย์ / ค่าที่ส่งไปยังคลัสเตอร์ Kafka ตัวสร้างคลาส ProducerRecord สำหรับสร้างเรกคอร์ดที่มีพาร์ติชันคู่คีย์และค่าโดยใช้ลายเซ็นต่อไปนี้
public ProducerRecord (string topic, int partition, k key, v value)
Topic - ชื่อหัวข้อที่ผู้ใช้กำหนดซึ่งจะต่อท้ายเพื่อบันทึก
Partition - จำนวนพาร์ติชัน
Key - คีย์ที่จะรวมอยู่ในบันทึก
- Value - บันทึกเนื้อหา
public ProducerRecord (string topic, k key, v value)
ตัวสร้างคลาส ProducerRecord ใช้เพื่อสร้างเรกคอร์ดที่มีคีย์คู่ค่าและไม่มีพาร์ติชัน
Topic - สร้างหัวข้อเพื่อกำหนดบันทึก
Key - คีย์สำหรับบันทึก
Value - บันทึกเนื้อหา
public ProducerRecord (string topic, v value)
คลาส ProducerRecord สร้างเร็กคอร์ดโดยไม่มีพาร์ติชันและคีย์
Topic - สร้างหัวข้อ
Value - บันทึกเนื้อหา
เมธอดคลาส ProducerRecord แสดงอยู่ในตารางต่อไปนี้ -
ส. เลขที่ | วิธีการเรียนและคำอธิบาย |
---|---|
1 | public string topic() หัวข้อจะต่อท้ายบันทึก |
2 | public K key() คีย์ที่จะรวมอยู่ในบันทึก หากไม่มีคีย์ดังกล่าวค่าว่างจะถูกเปลี่ยนใหม่ที่นี่ |
3 | public V value() บันทึกเนื้อหา |
4 | partition() จำนวนพาร์ติชันสำหรับบันทึก |
แอปพลิเคชั่น SimpleProducer
ก่อนสร้างแอปพลิเคชันขั้นแรกให้เริ่มโบรกเกอร์ ZooKeeper และ Kafka จากนั้นสร้างหัวข้อของคุณเองในโบรกเกอร์คาฟคาโดยใช้คำสั่งสร้างหัวข้อ หลังจากนั้นสร้างคลาส java ชื่อSim-pleProducer.java
แล้วพิมพ์ coding ต่อไปนี้
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation - สามารถคอมไพล์แอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution - แอปพลิเคชันสามารถดำเนินการได้โดยใช้คำสั่งต่อไปนี้
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
ตัวอย่างผู้บริโภคอย่างง่าย
ณ ตอนนี้เราได้สร้างผู้ผลิตเพื่อส่งข้อความไปยังคลัสเตอร์ Kafka ตอนนี้ให้เราสร้างผู้บริโภคเพื่อบริโภคข้อความจากคลัสเตอร์ Kafka KafkaConsumer API ใช้เพื่อบริโภคข้อความจากคลัสเตอร์ Kafka ตัวสร้างคลาส KafkaConsumer ถูกกำหนดไว้ด้านล่าง
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - ส่งคืนแผนที่ของการกำหนดค่าผู้บริโภค
คลาส KafkaConsumer มีวิธีการที่สำคัญดังต่อไปนี้ซึ่งแสดงไว้ในตารางด้านล่าง
ส. เลขที่ | วิธีการและคำอธิบาย |
---|---|
1 | public java.util.Set<TopicPar-tition> assignment() รับชุดของพาร์ติชั่นที่ผู้ผลิตคอนซูเมอร์มอบหมาย |
2 | public string subscription() สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก |
3 | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก |
4 | public void unsubscribe() ยกเลิกการสมัครหัวข้อจากรายการพาร์ติชันที่กำหนด |
5 | public void sub-scribe(java.util.List<java.lang.String> topics) สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก หากรายการหัวข้อที่ระบุว่างเปล่าจะถือว่าเหมือนกับการยกเลิกการสมัคร () |
6 | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) รูปแบบอาร์กิวเมนต์หมายถึงรูปแบบการสมัครสมาชิกในรูปแบบของนิพจน์ทั่วไปและอาร์กิวเมนต์ Listener จะได้รับการแจ้งเตือนจากรูปแบบการสมัครสมาชิก |
7 | public void as-sign(java.util.List<TopicParti-tion> partitions) กำหนดรายการพาร์ติชันให้กับลูกค้าด้วยตนเอง |
8 | poll() ดึงข้อมูลสำหรับหัวข้อหรือพาร์ติชันที่ระบุโดยใช้หนึ่งในสมัคร / กำหนด API สิ่งนี้จะส่งคืนข้อผิดพลาดหากไม่ได้สมัครหัวข้อก่อนการสำรวจข้อมูล |
9 | public void commitSync() คอมมิตออฟเซ็ตที่ส่งคืนในแบบสำรวจล่าสุด () สำหรับรายการหัวข้อและพาร์ติชันย่อยที่เขียนไว้ทั้งหมด การดำเนินการเดียวกันนี้ถูกนำไปใช้กับคอมมิต Asyn () |
10 | public void seek(TopicPartition partition, long offset) ดึงค่าชดเชยปัจจุบันที่ผู้บริโภคจะใช้ในวิธีการสำรวจความคิดเห็น () ถัดไป |
11 | public void resume() ดำเนินการต่อพาร์ติชันที่หยุดชั่วคราว |
12 | public void wakeup() ปลุกผู้บริโภค |
ConsumerRecord API
ConsumerRecord API ใช้เพื่อรับเรกคอร์ดจากคลัสเตอร์ Kafka API นี้ประกอบด้วยชื่อหัวข้อหมายเลขพาร์ติชันซึ่งจะได้รับเร็กคอร์ดและออฟเซ็ตที่ชี้ไปยังเร็กคอร์ดในพาร์ติชัน Kafka คลาส ConsumerRecord ใช้เพื่อสร้างเรกคอร์ดผู้บริโภคที่มีชื่อหัวข้อเฉพาะจำนวนพาร์ติชันและคู่ <คีย์ค่า> มีลายเซ็นดังต่อไปนี้
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic - ชื่อหัวข้อสำหรับบันทึกผู้บริโภคที่ได้รับจากคลัสเตอร์ Kafka
Partition - พาร์ติชันสำหรับหัวข้อ
Key - คีย์ของเรกคอร์ดหากไม่มีคีย์อยู่จะถูกส่งกลับ
Value - บันทึกเนื้อหา
ConsumerRecords API
ConsumerRecords API ทำหน้าที่เป็นคอนเทนเนอร์สำหรับ ConsumerRecord API นี้ใช้เพื่อเก็บรายการ ConsumerRecord ต่อพาร์ติชันสำหรับหัวข้อเฉพาะ ตัวสร้างของมันถูกกำหนดไว้ด้านล่าง
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
TopicPartition - ส่งคืนแผนที่พาร์ติชันสำหรับหัวข้อใดหัวข้อหนึ่ง
Records - รายการส่งคืน ConsumerRecord
คลาส ConsumerRecords มีการกำหนดวิธีการดังต่อไปนี้
ส. เลขที่ | วิธีการและคำอธิบาย |
---|---|
1 | public int count() จำนวนบันทึกสำหรับหัวข้อทั้งหมด |
2 | public Set partitions() ชุดของพาร์ติชันที่มีข้อมูลในชุดระเบียนนี้ (หากไม่มีการส่งคืนข้อมูลชุดนั้นจะว่างเปล่า) |
3 | public Iterator iterator() Iterator ช่วยให้คุณสามารถวนรอบคอลเลกชันรับหรือย้ายองค์ประกอบได้ |
4 | public List records() รับรายการบันทึกสำหรับพาร์ติชันที่กำหนด |
การตั้งค่าการกำหนดค่า
การตั้งค่าคอนฟิกูเรชันสำหรับการตั้งค่าคอนฟิกูเรชันหลักของ Consumer client API แสดงไว้ด้านล่าง -
ส. เลขที่ | การตั้งค่าและคำอธิบาย |
---|---|
1 | bootstrap.servers Bootstrapping รายชื่อโบรกเกอร์ |
2 | group.id กำหนดผู้บริโภคแต่ละรายให้กับกลุ่ม |
3 | enable.auto.commit เปิดใช้งานการคอมมิตอัตโนมัติสำหรับการชดเชยหากค่าเป็นจริงมิฉะนั้นจะไม่ถูกคอมมิต |
4 | auto.commit.interval.ms ย้อนกลับว่าจะเขียนค่าออฟเซ็ตที่บริโภคที่อัปเดตบ่อยเพียงใดไปยัง ZooKeeper |
5 | session.timeout.ms ระบุจำนวนมิลลิวินาทีที่ Kafka จะรอให้ ZooKeeper ตอบกลับคำขอ (อ่านหรือเขียน) ก่อนที่จะยอมแพ้และใช้ข้อความต่อไป |
แอปพลิเคชั่น SimpleConsumer
ขั้นตอนการสมัครผู้ผลิตยังคงเหมือนเดิมที่นี่ ขั้นแรกเริ่มนายหน้า ZooKeeper และ Kafka จากนั้นสร้างแอปพลิเคชันSimpleConsumer
ด้วยคลาส java ชื่อSimpleCon-sumer.java
และพิมพ์รหัสต่อไปนี้
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation - สามารถคอมไพล์แอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution − แอปพลิเคชันสามารถดำเนินการได้โดยใช้คำสั่งต่อไปนี้
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input- เปิดผู้ผลิต CLI และส่งข้อความไปยังหัวข้อ คุณสามารถใส่ smple input เป็น 'Hello Consumer'
Output - ต่อไปนี้จะเป็นผลลัพธ์
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
กลุ่มผู้บริโภคคือการใช้งานแบบหลายเธรดหรือหลายเครื่องจากหัวข้อ Kafka
กลุ่มผู้บริโภค
ผู้บริโภคสามารถเข้าร่วมกลุ่มได้โดยใช้
group.id
เดียวกันความขนานสูงสุดของกลุ่มคือจำนวนผู้บริโภคในกลุ่ม←ไม่มีพาร์ติชัน
Kafka กำหนดพาร์ติชันของหัวข้อให้กับผู้บริโภคในกลุ่มเพื่อให้แต่ละพาร์ติชันถูกใช้โดยผู้บริโภคเพียงคนเดียวในกลุ่ม
Kafka รับประกันว่าข้อความจะถูกอ่านโดยผู้บริโภครายเดียวในกลุ่มเท่านั้น
ผู้บริโภคสามารถดูข้อความตามลำดับที่จัดเก็บไว้ในบันทึก
การปรับสมดุลของผู้บริโภคอีกครั้ง
การเพิ่มกระบวนการ / เธรดมากขึ้นจะทำให้ Kafka ปรับสมดุลใหม่ หากผู้บริโภคหรือนายหน้ารายใดไม่สามารถส่ง heartbeat ไปยัง ZooKeeper ก็สามารถกำหนดค่าใหม่ได้ผ่านคลัสเตอร์ Kafka ในระหว่างการปรับสมดุลใหม่ Kafka จะกำหนดพาร์ติชันที่พร้อมใช้งานให้กับเธรดที่มีอยู่โดยอาจย้ายพาร์ติชันไปยังกระบวนการอื่น
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
การรวบรวม
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
การดำเนินการ
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
ที่นี่เราได้สร้างชื่อกลุ่มตัวอย่างเป็นmy-group
กับผู้บริโภคสองคน ในทำนองเดียวกันคุณสามารถสร้างกลุ่มและจำนวนผู้บริโภคในกลุ่มได้
อินพุต
เปิดผู้ผลิต CLI และส่งข้อความเช่น -
Test consumer group 01
Test consumer group 02
ผลลัพธ์ของกระบวนการแรก
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
ผลลัพธ์ของกระบวนการที่สอง
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
ตอนนี้หวังว่าคุณจะเข้าใจ SimpleConsumer และ ConsumeGroup โดยใช้การสาธิตไคลเอ็นต์ Java ตอนนี้คุณมีแนวคิดเกี่ยวกับวิธีการส่งและรับข้อความโดยใช้ไคลเอ็นต์ Java ให้เราดำเนินการรวม Kafka กับเทคโนโลยีข้อมูลขนาดใหญ่ในบทต่อไป
ในบทนี้เราจะเรียนรู้วิธีการรวม Kafka กับ Apache Storm
เกี่ยวกับ Storm
Storm ถูกสร้างขึ้นโดย Nathan Marz และทีมงานที่ BackType ในช่วงเวลาสั้น ๆ Apache Storm กลายเป็นมาตรฐานสำหรับระบบประมวลผลแบบเรียลไทม์แบบกระจายซึ่งช่วยให้คุณประมวลผลข้อมูลจำนวนมหาศาลได้ สตอร์มเร็วมากและค่ามาตรฐานโอเวอร์คล็อกด้วยการประมวลผลมากกว่าหนึ่งล้านทูเปิลต่อวินาทีต่อโหนด Apache Storm ทำงานอย่างต่อเนื่องโดยใช้ข้อมูลจากแหล่งที่กำหนด (Spouts) และส่งข้อมูลไปยังท่อประมวลผล (Bolts) Com-bined Spouts และ Bolts สร้างโทโพโลยี
บูรณาการกับ Storm
Kafka และ Storm เสริมซึ่งกันและกันอย่างเป็นธรรมชาติและความร่วมมืออันทรงพลังของพวกเขาช่วยให้การวิเคราะห์สตรีมมิ่งแบบเรียลไทม์สำหรับข้อมูลขนาดใหญ่ที่เคลื่อนไหวอย่างรวดเร็ว การรวม Kafka และ Storm ช่วยให้นักพัฒนาสามารถนำเข้าและเผยแพร่สตรีมข้อมูลจากโทโพโลยี Storm ได้ง่ายขึ้น
กระแสความคิด
พวยกาเป็นแหล่งที่มาของลำธาร ตัวอย่างเช่นพวยกาอาจอ่านสิ่งที่เพิ่มขึ้นจากหัวข้อคาฟคาและส่งออกเป็นสตรีม โบลต์ใช้สตรีมอินพุตกระบวนการและอาจส่งกระแสข้อมูลใหม่ Bolts สามารถทำอะไรก็ได้ตั้งแต่การเรียกใช้ฟังก์ชันการกรองสิ่งที่เพิ่มขึ้นการรวมการสตรีมการรวมการสตรีมการพูดคุยกับฐานข้อมูลและอื่น ๆ แต่ละโหนดในโทโพโลยี Storm ทำงานแบบขนาน โทโพโลยีทำงานไปเรื่อย ๆ จนกว่าคุณจะยุติ Storm จะมอบหมายงานที่ล้มเหลวโดยอัตโนมัติ นอกจากนี้สตอร์มยังรับประกันว่าจะไม่มีการสูญหายของข้อมูลแม้ว่าเครื่องจะล่มและข้อความหลุดก็ตาม
ให้เราดูรายละเอียดเกี่ยวกับ API การรวม Kafka-Storm มีสามคลาสหลักในการรวม Kafka กับ Storm มีดังนี้ -
BrokerHosts - ZkHosts & StaticHosts
BrokerHosts เป็นอินเทอร์เฟซและ ZkHosts และ StaticHosts เป็นสองการใช้งานหลัก ZkHosts ใช้เพื่อติดตามโบรกเกอร์ Kafka แบบไดนามิกโดยการรักษารายละเอียดใน ZooKeeper ในขณะที่ StaticHosts ใช้เพื่อตั้งค่าโบรกเกอร์ Kafka และรายละเอียดด้วยตนเอง / แบบคงที่ ZkHosts เป็นวิธีที่ง่ายและรวดเร็วในการเข้าถึงโบรกเกอร์ Kafka
ลายเซ็นของ ZkHosts มีดังต่อไปนี้ -
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
โดยที่นายหน้า ZkStr คือโฮสต์ ZooKeeper และนายหน้า ZkPath คือเส้นทาง ZooKeeper เพื่อรักษารายละเอียดนายหน้าของคาฟคา
KafkaConfig API
API นี้ใช้เพื่อกำหนดการตั้งค่าคอนฟิกสำหรับคลัสเตอร์ Kafka ลายเซ็นของ Kafka Con-fig ถูกกำหนดไว้ดังนี้
public KafkaConfig(BrokerHosts hosts, string topic)
Hosts - BrokerHosts สามารถเป็น ZkHosts / StaticHosts
Topic - ชื่อหัวข้อ
SpoutConfig API
Spoutconfig เป็นส่วนขยายของ KafkaConfig ที่รองรับข้อมูล ZooKeeper เพิ่มเติม
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
Hosts - BrokerHosts สามารถใช้งานอินเทอร์เฟซ BrokerHosts ได้
Topic - ชื่อหัวข้อ
zkRoot - เส้นทางราก ZooKeeper
id −พวยกาเก็บสถานะของการชดเชยที่บริโภคใน Zookeeper รหัสควรระบุพวยกาของคุณโดยไม่ซ้ำกัน
SchemeAsMultiScheme
SchemeAsMultiScheme เป็นอินเทอร์เฟซที่กำหนดวิธีการที่ ByteBuffer ที่ใช้จาก Kafka จะเปลี่ยนเป็นพายุทูเพิล ได้มาจาก MultiScheme และยอมรับการใช้งานคลาส Scheme มีการใช้งานคลาส Scheme จำนวนมากและการใช้งานแบบนั้นคือ StringScheme ซึ่งแยกวิเคราะห์ไบต์เป็นสตริงธรรมดา นอกจากนี้ยังควบคุมการตั้งชื่อฟิลด์เอาต์พุตของคุณ ลายเซ็นถูกกำหนดไว้ดังนี้
public SchemeAsMultiScheme(Scheme scheme)
Scheme - บัฟเฟอร์ไบต์ที่บริโภคจากคาฟคา
KafkaSpout API
KafkaSpout คือการใช้งานพวยกาของเราซึ่งจะทำงานร่วมกับ Storm มันดึง mes-sages จากหัวข้อ kafka และส่งมันไปยังระบบนิเวศของ Storm ในรูปแบบ tuples KafkaSpout รับรายละเอียดการกำหนดค่าจาก SpoutConfig
ด้านล่างนี้เป็นโค้ดตัวอย่างเพื่อสร้าง Kafka spout แบบง่ายๆ
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
การสร้างกลอน
Bolt เป็นส่วนประกอบที่รับทูเปิลเป็นอินพุตประมวลผลทูเพิลและสร้างสิ่งทอปเปิลใหม่เป็นเอาต์พุต Bolts จะใช้อินเทอร์เฟซ IRichBolt ในโปรแกรมนี้มีการใช้โบลต์สองคลาส WordSplitter-Bolt และ WordCounterBolt เพื่อดำเนินการ
อินเทอร์เฟซ IRichBolt มีวิธีการดังต่อไปนี้ -
Prepare- จัดเตรียมโบลต์พร้อมสภาพแวดล้อมในการดำเนินการ ตัวดำเนินการจะเรียกใช้วิธีนี้เพื่อเริ่มต้นพวยกา
Execute - ประมวลผลอินพุตทูเพิลเดียว
Cleanup - เรียกว่าเมื่อสายฟ้ากำลังจะปิดลง
declareOutputFields - ประกาศสคีมาผลลัพธ์ของทูเปิล
ให้เราสร้าง SplitBolt.java ซึ่งใช้ตรรกะในการแยกประโยคออกเป็นคำและ CountBolt.java ซึ่งใช้ตรรกะในการแยกคำที่ไม่ซ้ำกันและนับจำนวนที่เกิดขึ้น
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
ส่งไปยังโทโพโลยี
โทโพโลยีแบบพายุเป็นโครงสร้างแบบ Thrift คลาส TopologyBuilder มีวิธีการที่ง่ายและสะดวกในการสร้างโทโพโลยีที่ซับซ้อน คลาส TopologyBuilder มีเมธอดในการตั้งพวยกา (setSpout) และตั้งค่าโบลต์ (setBolt) ในที่สุด TopologyBuilder ได้ createTopology เพื่อสร้าง to-pology วิธี shuffleGrouping และ fieldsGrouping ช่วยในการตั้งค่าการจัดกลุ่มสตรีมสำหรับพวยกาและสลักเกลียว
Local Cluster- เพื่อวัตถุประสงค์ในการพัฒนาเราสามารถสร้างคลัสเตอร์ท้องถิ่นโดยใช้LocalCluster
วัตถุแล้วส่ง topology โดยใช้submitTopology
วิธีการLocalCluster
ระดับ
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
ก่อนที่จะย้ายการรวบรวมการรวม Kakfa-Storm ต้องการไลบรารี java ไคลเอนต์ ZooKeeper ผู้ดูแล ผู้ดูแลเวอร์ชัน 2.9.1 รองรับ Apache Storm เวอร์ชัน 0.9.5 (ซึ่งเราใช้ในบทช่วยสอนนี้) ดาวน์โหลดไฟล์ jar ที่ระบุด้านล่างและวางไว้ในพา ธ คลาส java
- curator-client-2.9.1.jar
- curator-framework-2.9.1.jar
หลังจากรวมไฟล์อ้างอิงแล้วให้คอมไพล์โปรแกรมโดยใช้คำสั่งต่อไปนี้
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
การดำเนินการ
เริ่ม Kafka Producer CLI (อธิบายไว้ในบทก่อนหน้า) สร้างหัวข้อใหม่ชื่อmy-first-topic
และให้ข้อความตัวอย่างตามที่แสดงด้านล่าง -
hello
kafka
storm
spark
test message
another test message
ตอนนี้เรียกใช้แอปพลิเคชันโดยใช้คำสั่งต่อไปนี้ -
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
ตัวอย่างผลลัพธ์ของแอปพลิเคชันนี้ระบุไว้ด้านล่าง -
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2
ในบทนี้เราจะพูดถึงวิธีการรวม Apache Kafka กับ Spark Streaming API
เกี่ยวกับ Spark
Spark Streaming API ช่วยให้สามารถประมวลผลสตรีมข้อมูลสดที่ปรับขนาดได้ปริมาณงานสูงและทนต่อข้อผิดพลาด ข้อมูลสามารถนำเข้าจากหลายแหล่งเช่น Kafka, Flume, Twitter และอื่น ๆ และสามารถประมวลผลโดยใช้อัลกอริทึมที่ซับซ้อนเช่นฟังก์ชันระดับสูงเช่นแผนที่, ลด, เข้าร่วมและหน้าต่าง ในที่สุดข้อมูลที่ประมวลผลแล้วสามารถส่งออกไปยังระบบไฟล์ฐานข้อมูลและแดชบอร์ดแบบสดได้ Resilient Distributed Datasets (RDD) เป็นโครงสร้างข้อมูลพื้นฐานของ Spark มันเป็นคอลเลกชันของวัตถุที่กระจายไม่เปลี่ยนรูป ชุดข้อมูลแต่ละชุดใน RDD จะแบ่งออกเป็นโลจิคัลพาร์ติชันซึ่งอาจคำนวณจากโหนดต่างๆของคลัสเตอร์
บูรณาการกับ Spark
Kafka เป็นแพลตฟอร์มการส่งข้อความและการผสานรวมที่มีศักยภาพสำหรับสตรีมมิ่ง Spark Kafka ทำหน้าที่เป็นศูนย์กลางสำหรับสตรีมข้อมูลแบบเรียลไทม์และประมวลผลโดยใช้อัลกอริทึมที่ซับซ้อนใน Spark Streaming เมื่อประมวลผลข้อมูลแล้ว Spark Streaming อาจเผยแพร่ผลลัพธ์ในหัวข้อ Kafka อื่นหรือจัดเก็บใน HDFS ฐานข้อมูลหรือแดชบอร์ด แผนภาพต่อไปนี้แสดงให้เห็นถึงการไหลของแนวคิด
ตอนนี้ให้เราดูรายละเอียดเกี่ยวกับ Kafka-Spark API
SparkConf API
แสดงถึงการกำหนดค่าสำหรับแอปพลิเคชัน Spark ใช้เพื่อตั้งค่าพารามิเตอร์ Spark ต่างๆเป็นคู่คีย์ - ค่า
คลาสSparkConf
มีวิธีการดังต่อไปนี้ -
set(string key, string value) - ตั้งค่าตัวแปรการกำหนดค่า
remove(string key) - ลบคีย์ออกจากการกำหนดค่า
setAppName(string name) - ตั้งชื่อแอปพลิเคชันสำหรับแอปพลิเคชันของคุณ
get(string key) - รับกุญแจ
StreamingContext API
นี่คือจุดเริ่มต้นหลักสำหรับฟังก์ชัน Spark SparkContext แสดงถึงการเชื่อมต่อกับคลัสเตอร์ Spark และสามารถใช้เพื่อสร้าง RDDs, ตัวสะสมและตัวแปรออกอากาศบนคลัสเตอร์ ลายเซ็นถูกกำหนดตามที่แสดงด้านล่าง
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - คลัสเตอร์ URL ที่จะเชื่อมต่อ (เช่น mesos: // host: port, spark: // host: port, local [4])
appName - ชื่องานของคุณเพื่อแสดงบน UI เว็บคลัสเตอร์
batchDuration - ช่วงเวลาที่ข้อมูลการสตรีมจะถูกแบ่งออกเป็นแบทช์
public StreamingContext(SparkConf conf, Duration batchDuration)
สร้าง StreamingContext โดยจัดเตรียมคอนฟิกูเรชันที่จำเป็นสำหรับ SparkContext ใหม่
conf - พารามิเตอร์ Spark
batchDuration - ช่วงเวลาที่ข้อมูลการสตรีมจะถูกแบ่งออกเป็นแบทช์
KafkaUtils API
KafkaUtils API ใช้เพื่อเชื่อมต่อคลัสเตอร์ Kafka กับ Spark streaming API นี้มีวิธีการsignifi
-cant ลายเซ็นcreateStream ที่
กำหนดไว้ด้านล่าง
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
วิธีการที่แสดงด้านบนใช้เพื่อสร้างสตรีมอินพุตที่ดึงข้อความจาก Kafka Brokers
ssc - วัตถุ StreamingContext
zkQuorum - องค์ประชุมผู้ดูแลสวนสัตว์
groupId - รหัสกลุ่มสำหรับผู้บริโภครายนี้
topics - ส่งคืนแผนที่หัวข้อที่ต้องการบริโภค
storageLevel - ระดับการจัดเก็บเพื่อใช้สำหรับจัดเก็บวัตถุที่ได้รับ
KafkaUtils API มีวิธีการ createDirectStream อีกวิธีหนึ่งซึ่งใช้ในการสร้างสตรีมอินพุตที่ดึงข้อความจาก Kafka Brokers โดยตรงโดยไม่ต้องใช้เครื่องรับใด ๆ สตรีมนี้สามารถรับประกันได้ว่าแต่ละข้อความจาก Kafka จะรวมอยู่ในการเปลี่ยนแปลงเพียงครั้งเดียว
แอปพลิเคชันตัวอย่างทำได้ใน Scala ในการรวบรวมแอปพลิเคชันโปรดดาวน์โหลดและติดตั้งsbt
เครื่องมือสร้าง scala (คล้ายกับ maven) รหัสแอปพลิเคชันหลักแสดงอยู่ด้านล่าง
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
สร้างสคริปต์
การผสานรวม spark-kafka ขึ้นอยู่กับการจุดประกายการสตรีมและจุดประกายโถการรวม Kafka สร้างไฟล์ใหม่build.sbt
และระบุรายละเอียดแอปพลิเคชันและการอ้างอิง SBT
จะดาวน์โหลดขวดที่จำเป็นในขณะที่รวบรวมและการบรรจุแอพลิเคชัน
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
การรวบรวม / บรรจุภัณฑ์
รันคำสั่งต่อไปนี้เพื่อคอมไพล์และแพ็กเกจไฟล์ jar ของแอ็พพลิเคชัน เราจำเป็นต้องส่งไฟล์ jar ลงในคอนโซล spark เพื่อเรียกใช้แอปพลิเคชัน
sbt package
ส่งไปยัง Spark
เริ่ม Kafka Producer CLI (อธิบายไว้ในบทก่อนหน้า) สร้างหัวข้อใหม่ชื่อmy-first-topic
และจัดเตรียมข้อความตัวอย่างตามที่แสดงด้านล่าง
Another spark test message
เรียกใช้คำสั่งต่อไปนี้เพื่อส่งแอปพลิเคชันไปยัง spark Console
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
ผลลัพธ์ตัวอย่างของแอปพลิเคชันนี้แสดงไว้ด้านล่าง
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..
ให้เราวิเคราะห์แอปพลิเคชันแบบเรียลไทม์เพื่อรับฟีด Twitter ล่าสุดและแฮชแท็ก ก่อนหน้านี้เราได้เห็นการรวม Storm และ Spark เข้ากับ Kafka ในทั้งสองสถานการณ์เราได้สร้าง Kafka Producer (โดยใช้ cli) เพื่อส่งข้อความไปยังระบบนิเวศของ Kafka จากนั้นพายุและประกายไฟจะอ่านข้อความโดยใช้ผู้บริโภค Kafka และฉีดเข้าไปในระบบนิเวศของพายุและประกายไฟตามลำดับ ดังนั้นในทางปฏิบัติเราจำเป็นต้องสร้าง Kafka Producer ซึ่งควรจะ -
- อ่านฟีด Twitter โดยใช้“ Twitter Streaming API”
- ประมวลผลฟีด
- แยก HashTags และ
- ส่งไปที่คาฟคา
เมื่อKafka
ได้รับHashTags
แล้วการผสานรวม Storm / Spark จะได้รับ infor-mation และส่งไปยังระบบนิเวศ Storm / Spark
Twitter Streaming API
คุณสามารถเข้าถึง“ Twitter Streaming API” ในภาษาโปรแกรมใดก็ได้ “ twitter4j” เป็นไลบรารี Java แบบโอเพนซอร์สที่ไม่เป็นทางการซึ่งมีโมดูลที่ใช้ Java เพื่อเข้าถึง“ Twitter Streaming API” ได้อย่างง่ายดาย "twitter4j" เป็นกรอบงานสำหรับผู้ฟังเพื่อเข้าถึงทวีต ในการเข้าถึง“ Twitter Streaming API” เราจำเป็นต้องลงชื่อเข้าใช้บัญชีผู้พัฒนา Twitter และควรได้รับสิ่งต่อไปนี้OAuth รายละเอียดการรับรองความถูกต้อง
- Customerkey
- CustomerSecret
- AccessToken
- AccessTookenSecret
เมื่อสร้างบัญชีผู้พัฒนาแล้วให้ดาวน์โหลดไฟล์ jar“ twitter4j” และวางไว้ในพา ธ คลาส java
การเข้ารหัสผู้ผลิต Twitter Kafka ที่สมบูรณ์ (KafkaTwitterProducer.java) อยู่ด้านล่าง -
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
if(args.length < 5){
System.out.println(
"Usage: KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
String consumerKey = args[0].toString();
String consumerSecret = args[1].toString();
String accessToken = args[2].toString();
String accessTokenSecret = args[3].toString();
String topicName = args[4].toString();
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
// System.out.println("@" + status.getUser().getScreenName()
+ " - " + status.getText());
// System.out.println("@" + status.getUser().getScreen-Name());
/*for(URLEntity urle : status.getURLEntities()) {
System.out.println(urle.getDisplayURL());
}*/
/*for(HashtagEntity hashtage : status.getHashtagEntities()) {
System.out.println(hashtage.getText());
}*/
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
// System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// System.out.println("Got track limitation notice:" +
num-berOfLimitedStatuses);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// System.out.println("Got scrub_geo event userId:" + userId +
"upToStatusId:" + upToStatusId);
}
@Override
public void onStallWarning(StallWarning warning) {
// System.out.println("Got stall warning:" + warning);
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery().track(keyWords);
twitterStream.filter(query);
Thread.sleep(5000);
//Add Kafka producer config settings
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
int j = 0;
while(i < 10) {
Status ret = queue.poll();
if (ret == null) {
Thread.sleep(100);
i++;
}else {
for(HashtagEntity hashtage : ret.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
producer.send(new ProducerRecord<String, String>(
top-icName, Integer.toString(j++), hashtage.getText()));
}
}
}
producer.close();
Thread.sleep(5000);
twitterStream.shutdown();
}
}
การรวบรวม
คอมไพล์แอปพลิเคชันโดยใช้คำสั่งต่อไปนี้ -
javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java
การดำเนินการ
เปิดสองคอนโซล เรียกใช้แอปพลิเคชันที่รวบรวมไว้ด้านบนดังที่แสดงด้านล่างในคอนโซลเดียว
java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food
เรียกใช้แอปพลิเคชัน Spark / Storm ใด ๆ ที่อธิบายไว้ในบทก่อนหน้าใน win-dow อื่น ประเด็นหลักที่ควรทราบคือหัวข้อที่ใช้ควรเหมือนกันในทั้งสองกรณี ที่นี่เราใช้“ my-first-topic” เป็นชื่อหัวข้อ
เอาต์พุต
ผลลัพธ์ของแอปพลิเคชั่นนี้จะขึ้นอยู่กับคีย์เวิร์ดและฟีดปัจจุบันของ twitter มีการระบุเอาต์พุตตัวอย่างด้านล่าง (การรวมพายุ)
. . .
food : 1
foodie : 2
burger : 1
. . .
Kafka Tool บรรจุภายใต้“ org.apache.kafka.tools. *. เครื่องมือแบ่งออกเป็นเครื่องมือระบบและเครื่องมือจำลองแบบ
เครื่องมือระบบ
เครื่องมือระบบสามารถเรียกใช้จากบรรทัดคำสั่งโดยใช้สคริปต์คลาสรัน ไวยากรณ์มีดังนี้ -
bin/kafka-run-class.sh package.class - - options
เครื่องมือระบบบางส่วนมีการกล่าวถึงด้านล่าง -
Kafka Migration Tool - เครื่องมือนี้ใช้ในการโยกย้ายนายหน้าจากเวอร์ชันหนึ่งไปยังอีกเวอร์ชันหนึ่ง
Mirror Maker - เครื่องมือนี้ใช้เพื่อทำการมิเรอร์คลัสเตอร์ Kafka หนึ่งไปยังอีกคลัสเตอร์
Consumer Offset Checker - เครื่องมือนี้แสดงกลุ่มผู้บริโภคหัวข้อพาร์ทิชันนอกสถานที่ล็อกขนาดเจ้าของสำหรับชุดหัวข้อและกลุ่มผู้บริโภคที่ระบุ
เครื่องมือจำลองแบบ
การจำลองแบบคาฟคาเป็นเครื่องมือการออกแบบระดับสูง วัตถุประสงค์ของการเพิ่มเครื่องมือจำลองแบบคือเพื่อความทนทานที่แข็งแกร่งและความพร้อมใช้งานที่สูงขึ้น เครื่องมือจำลองแบบบางส่วนมีการกล่าวถึงด้านล่าง -
Create Topic Tool - สิ่งนี้จะสร้างหัวข้อที่มีจำนวนพาร์ติชันเริ่มต้นปัจจัยการจำลองและใช้โครงร่างเริ่มต้นของ Kafka ในการกำหนดแบบจำลอง
List Topic Tool- เครื่องมือนี้แสดงข้อมูลสำหรับรายการหัวข้อที่กำหนด หากไม่มีหัวข้อในบรรทัดคำสั่งเครื่องมือจะสอบถาม Zookeeper เพื่อรับหัวข้อทั้งหมดและแสดงข้อมูลสำหรับหัวข้อเหล่านั้น ฟิลด์ที่เครื่องมือแสดง ได้แก่ ชื่อหัวข้อพาร์ติชันผู้นำแบบจำลอง isr
Add Partition Tool- การสร้างหัวข้อต้องระบุจำนวนพาร์ติชันสำหรับหัวข้อ ในภายหลังอาจจำเป็นต้องใช้พาร์ติชันเพิ่มเติมสำหรับหัวข้อเมื่อปริมาณของหัวข้อจะเพิ่มขึ้น เครื่องมือนี้ช่วยในการเพิ่มพาร์ติชันเพิ่มเติมสำหรับหัวข้อเฉพาะและยังช่วยให้การกำหนดแบบจำลองด้วยตนเองของพาร์ติชันที่เพิ่มเข้ามา
Kafka รองรับการใช้งานทางอุตสาหกรรมที่ดีที่สุดในปัจจุบันมากมาย เราจะให้ภาพรวมสั้น ๆ เกี่ยวกับแอปพลิเคชั่นที่โดดเด่นที่สุดของ Kafka ในบทนี้
ทวิตเตอร์
Twitter เป็นบริการเครือข่ายสังคมออนไลน์ที่มีแพลตฟอร์มในการส่งและรับทวีตของผู้ใช้ ผู้ใช้ที่ลงทะเบียนสามารถอ่านและโพสต์ทวีตได้ แต่ผู้ใช้ที่ไม่ได้ลงทะเบียนสามารถอ่านทวีตได้เท่านั้น Twitter ใช้ Storm-Kafka เป็นส่วนหนึ่งของโครงสร้างพื้นฐานการประมวลผลสตรีม
Apache Kafka ใช้ที่ LinkedIn สำหรับข้อมูลสตรีมกิจกรรมและเมตริกการดำเนินงาน ระบบ Kafka mes-saging ช่วย LinkedIn ด้วยผลิตภัณฑ์ต่างๆเช่น LinkedIn Newsfeed, LinkedIn Today สำหรับการใช้ข้อความออนไลน์และนอกเหนือจากระบบการวิเคราะห์ออฟไลน์เช่น Hadoop ความทนทานที่แข็งแกร่งของ Kafka เป็นปัจจัยสำคัญอย่างหนึ่งในการเชื่อมต่อกับ LinkedIn
Netflix
Netflix เป็นผู้ให้บริการสตรีมมิ่งสื่ออินเทอร์เน็ตตามความต้องการข้ามชาติข้ามชาติ Netflix ใช้ Kafka สำหรับการตรวจสอบแบบเรียลไทม์และการประมวลผลเหตุการณ์
Mozilla
Mozilla เป็นชุมชนซอฟต์แวร์ฟรีที่สร้างขึ้นในปี 1998 โดยสมาชิกของ Netscape Kafka จะเข้ามาแทนที่ส่วนหนึ่งของระบบการผลิตในปัจจุบันของ Mozilla เพื่อรวบรวมข้อมูลประสิทธิภาพและการใช้งานจากเบราว์เซอร์ของผู้ใช้ปลายทางสำหรับโครงการต่างๆเช่น Telemetry, Test Pilot เป็นต้น
Oracle
Oracle ให้การเชื่อมต่อแบบเนทีฟกับ Kafka จากผลิตภัณฑ์ Enterprise Service Bus ที่เรียกว่า OSB (Oracle Service Bus) ซึ่งช่วยให้นักพัฒนาสามารถใช้ประโยชน์จากความสามารถในการไกล่เกลี่ยในตัว OSB เพื่อใช้ไปป์ไลน์ข้อมูลแบบทีละขั้น