Apache Kafka - ตัวอย่างผู้ผลิตอย่างง่าย
ให้เราสร้างแอปพลิเคชันสำหรับเผยแพร่และใช้งานข้อความโดยใช้ไคลเอ็นต์ Java ไคลเอนต์ผู้ผลิต Kafka ประกอบด้วย API ต่อไปนี้
KafkaProducer API
ให้เราเข้าใจชุดที่สำคัญที่สุดของ Kafka Producer API ในส่วนนี้ ส่วนกลางของ KafkaProducer API คือคลาสKafkaProducer 
คลาส KafkaProducer มีตัวเลือกในการเชื่อมต่อโบรกเกอร์ Kafka ในตัวสร้างด้วยวิธีการต่อไปนี้
- คลาส KafkaProducer จัดเตรียมวิธีการส่งเพื่อส่งข้อความไปยังหัวข้อแบบอะซิงโครนัส ลายเซ็นของ send () มีดังต่อไปนี้ 
producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);- ProducerRecord - ผู้ผลิตจัดการบัฟเฟอร์ของเร็กคอร์ดที่รอการส่ง 
- Callback - การโทรกลับที่ผู้ใช้จัดหาเพื่อดำเนินการเมื่อเซิร์ฟเวอร์ได้รับการยอมรับว่าบันทึกได้รับการปรับปรุง (null หมายถึงไม่มีการโทรกลับ) 
- คลาส KafkaProducer มีวิธีการล้างเพื่อให้แน่ใจว่าข้อความที่ส่งไปก่อนหน้านี้เสร็จสมบูรณ์แล้วจริงๆ ไวยากรณ์ของวิธีการล้างมีดังนี้ - 
public void flush()- คลาส KafkaProducer จัดเตรียมเมธอด partitionFor ซึ่งช่วยในการรับข้อมูลเมตาของพาร์ติชันสำหรับหัวข้อที่กำหนด สามารถใช้สำหรับการแบ่งพาร์ติชันแบบกำหนดเอง ลายเซ็นของวิธีนี้มีดังนี้ - 
public Map metrics()ส่งคืนแผนที่ของเมตริกภายในที่ดูแลโดยผู้สร้าง
- public void close () - คลาส KafkaProducer จัดเตรียมบล็อกวิธีการปิดจนกว่าคำขอที่ส่งก่อนหน้านี้ทั้งหมดจะเสร็จสมบูรณ์ 
Producer API
ส่วนสำคัญของ Producer API คือคลาสProducer 
คลาส Producer มีตัวเลือกในการเชื่อมต่อโบรกเกอร์ Kafka ในตัวสร้างโดยวิธีการดังต่อไปนี้
คลาส Producer
คลาสผู้ผลิตจัดเตรียมวิธีการส่งไปยัง send ข้อความไปยังหัวข้อเดียวหรือหลายหัวข้อโดยใช้ลายเซ็นต่อไปนี้
public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);ผู้ผลิตมีสองประเภท - Sync และ Async.
การกำหนดค่า API เดียวกันใช้กับผู้ผลิตSync
เช่นกัน ความแตกต่างระหว่างพวกเขาคือผู้ผลิตซิงค์ส่งข้อความโดยตรง แต่ส่งข้อความในพื้นหลัง ผู้ผลิต Async เป็นที่ต้องการเมื่อคุณต้องการทรูพุตที่สูงขึ้น ในรีลีสก่อนหน้านี้เช่น 0.8 ผู้ผลิต async ไม่มีการเรียกกลับสำหรับ send () เพื่อลงทะเบียนตัวจัดการข้อผิดพลาด สิ่งนี้พร้อมใช้งานในรุ่นปัจจุบัน 0.9 เท่านั้น
โมฆะสาธารณะปิด ()
คลาส Producer ให้ close วิธีปิดการเชื่อมต่อพูลผู้ผลิตกับ Kafka bro-kers ทั้งหมด
การตั้งค่าการกำหนดค่า
การตั้งค่าคอนฟิกูเรชันหลักของ Producer API แสดงอยู่ในตารางต่อไปนี้เพื่อให้ดียิ่งขึ้น
| ส. เลขที่ | การตั้งค่าการกำหนดค่าและคำอธิบาย | 
|---|---|
| 1 | client.id ระบุแอปพลิเคชันผู้ผลิต | 
| 2 | producer.type ซิงค์หรือไม่ซิงค์ | 
| 3 | acks การกำหนดค่า acks ควบคุมเกณฑ์ภายใต้การร้องขอของผู้ผลิตนั้นเป็นไปตามความสมบูรณ์ | 
| 4 | retries หากคำขอโปรดิวเซอร์ล้มเหลวให้ลองใหม่โดยอัตโนมัติด้วยค่าเฉพาะ | 
| 5 | bootstrap.servers bootstrapping รายชื่อโบรกเกอร์ | 
| 6 | linger.ms หากคุณต้องการลดจำนวนคำขอคุณสามารถตั้งค่า linger.ms เป็นค่าที่มากกว่าค่าบางค่าได้ | 
| 7 | key.serializer คีย์สำหรับอินเทอร์เฟซ Serializer | 
| 8 | value.serializer ค่าสำหรับอินเทอร์เฟซ Serializer | 
| 9 | batch.size ขนาดบัฟเฟอร์ | 
| 10 | buffer.memory ควบคุมจำนวนหน่วยความจำทั้งหมดที่พร้อมใช้งานสำหรับผู้สร้างสำหรับการบัฟเฟอร์ | 
ProducerRecord API
ProducerRecord เป็นคู่คีย์ / ค่าที่ส่งไปยังคลัสเตอร์ Kafka ตัวสร้างคลาส ProducerRecord สำหรับสร้างเรกคอร์ดที่มีพาร์ติชันคู่คีย์และค่าโดยใช้ลายเซ็นต่อไปนี้
public ProducerRecord (string topic, int partition, k key, v value)- Topic - ชื่อหัวข้อที่ผู้ใช้กำหนดซึ่งจะต่อท้ายเพื่อบันทึก 
- Partition - จำนวนพาร์ติชัน 
- Key - คีย์ที่จะรวมอยู่ในบันทึก 
- Value - บันทึกเนื้อหา
public ProducerRecord (string topic, k key, v value)ตัวสร้างคลาส ProducerRecord ใช้เพื่อสร้างเรกคอร์ดที่มีคีย์คู่ค่าและไม่มีพาร์ติชัน
- Topic - สร้างหัวข้อเพื่อกำหนดบันทึก 
- Key - คีย์สำหรับบันทึก 
- Value - บันทึกเนื้อหา 
public ProducerRecord (string topic, v value)คลาส ProducerRecord สร้างเร็กคอร์ดโดยไม่มีพาร์ติชันและคีย์
- Topic - สร้างหัวข้อ 
- Value - บันทึกเนื้อหา 
เมธอดคลาส ProducerRecord แสดงอยู่ในตารางต่อไปนี้ -
| ส. เลขที่ | วิธีการเรียนและคำอธิบาย | 
|---|---|
| 1 | public string topic() หัวข้อจะต่อท้ายบันทึก | 
| 2 | public K key() คีย์ที่จะรวมอยู่ในบันทึก หากไม่มีคีย์ดังกล่าวค่าว่างจะถูกเปลี่ยนใหม่ที่นี่ | 
| 3 | public V value() บันทึกเนื้อหา | 
| 4 | partition() จำนวนพาร์ติชันสำหรับบันทึก | 
แอปพลิเคชั่น SimpleProducer
ก่อนสร้างแอปพลิเคชันขั้นแรกให้เริ่มโบรกเกอร์ ZooKeeper และ Kafka จากนั้นสร้างหัวข้อของคุณเองในโบรกเกอร์คาฟคาโดยใช้คำสั่งสร้างหัวข้อ หลังจากนั้นสร้างคลาส java ชื่อSim-pleProducer.java
แล้วพิมพ์ coding ต่อไปนี้
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}Compilation - สามารถคอมไพล์แอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.javaExecution - แอปพลิเคชันสามารถดำเนินการได้โดยใช้คำสั่งต่อไปนี้
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10ตัวอย่างผู้บริโภคอย่างง่าย
ณ ตอนนี้เราได้สร้างผู้ผลิตเพื่อส่งข้อความไปยังคลัสเตอร์ Kafka ตอนนี้ให้เราสร้างผู้บริโภคเพื่อบริโภคข้อความจากคลัสเตอร์ Kafka KafkaConsumer API ใช้เพื่อบริโภคข้อความจากคลัสเตอร์ Kafka ตัวสร้างคลาส KafkaConsumer ถูกกำหนดไว้ด้านล่าง
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)configs - ส่งคืนแผนที่ของการกำหนดค่าผู้บริโภค
คลาส KafkaConsumer มีวิธีการที่สำคัญดังต่อไปนี้ซึ่งแสดงไว้ในตารางด้านล่าง
| ส. เลขที่ | วิธีการและคำอธิบาย | 
|---|---|
| 1 | public java.util.Set<TopicPar-tition> assignment() รับชุดของพาร์ติชั่นที่ผู้ผลิตคอนซูเมอร์มอบหมาย | 
| 2 | public string subscription() สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก | 
| 3 | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก | 
| 4 | public void unsubscribe() ยกเลิกการสมัครหัวข้อจากรายการพาร์ติชันที่กำหนด | 
| 5 | public void sub-scribe(java.util.List<java.lang.String> topics) สมัครสมาชิกรายการหัวข้อที่กำหนดเพื่อรับพาร์ติชันที่เซ็นชื่อแบบไดนามิก หากรายการหัวข้อที่ระบุว่างเปล่าจะถือว่าเหมือนกับการยกเลิกการสมัคร () | 
| 6 | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) รูปแบบอาร์กิวเมนต์หมายถึงรูปแบบการสมัครสมาชิกในรูปแบบของนิพจน์ทั่วไปและอาร์กิวเมนต์ Listener จะได้รับการแจ้งเตือนจากรูปแบบการสมัครสมาชิก | 
| 7 | public void as-sign(java.util.List<TopicParti-tion> partitions) กำหนดรายการพาร์ติชันให้กับลูกค้าด้วยตนเอง | 
| 8 | poll() ดึงข้อมูลสำหรับหัวข้อหรือพาร์ติชันที่ระบุโดยใช้หนึ่งในสมัคร / กำหนด API สิ่งนี้จะส่งคืนข้อผิดพลาดหากไม่ได้สมัครหัวข้อก่อนการสำรวจข้อมูล | 
| 9 | public void commitSync() คอมมิตออฟเซ็ตที่ส่งคืนในแบบสำรวจล่าสุด () สำหรับรายการหัวข้อและพาร์ติชันย่อยที่เขียนไว้ทั้งหมด การดำเนินการเดียวกันนี้ถูกนำไปใช้กับคอมมิต Asyn () | 
| 10 | public void seek(TopicPartition partition, long offset) ดึงค่าชดเชยปัจจุบันที่ผู้บริโภคจะใช้ในวิธีการสำรวจความคิดเห็น () ถัดไป | 
| 11 | public void resume() ดำเนินการต่อพาร์ติชันที่หยุดชั่วคราว | 
| 12 | public void wakeup() ปลุกผู้บริโภค | 
ConsumerRecord API
ConsumerRecord API ใช้เพื่อรับเรกคอร์ดจากคลัสเตอร์ Kafka API นี้ประกอบด้วยชื่อหัวข้อหมายเลขพาร์ติชันซึ่งจะได้รับเร็กคอร์ดและออฟเซ็ตที่ชี้ไปยังเร็กคอร์ดในพาร์ติชัน Kafka คลาส ConsumerRecord ใช้เพื่อสร้างเรกคอร์ดผู้บริโภคที่มีชื่อหัวข้อเฉพาะจำนวนพาร์ติชันและคู่ <คีย์ค่า> มีลายเซ็นดังต่อไปนี้
public ConsumerRecord(string topic,int partition, long offset,K key, V value)- Topic - ชื่อหัวข้อสำหรับบันทึกผู้บริโภคที่ได้รับจากคลัสเตอร์ Kafka 
- Partition - พาร์ติชันสำหรับหัวข้อ 
- Key - คีย์ของเร็กคอร์ดหากไม่มีคีย์อยู่จะถูกส่งคืน 
- Value - บันทึกเนื้อหา 
ConsumerRecords API
ConsumerRecords API ทำหน้าที่เป็นคอนเทนเนอร์สำหรับ ConsumerRecord API นี้ใช้เพื่อเก็บรายการ ConsumerRecord ต่อพาร์ติชันสำหรับหัวข้อเฉพาะ ตัวสร้างของมันถูกกำหนดไว้ด้านล่าง
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)- TopicPartition - ส่งคืนแผนที่พาร์ติชันสำหรับหัวข้อใดหัวข้อหนึ่ง 
- Records - รายการส่งคืน ConsumerRecord 
คลาส ConsumerRecords มีการกำหนดวิธีการดังต่อไปนี้
| ส. เลขที่ | วิธีการและคำอธิบาย | 
|---|---|
| 1 | public int count() จำนวนบันทึกสำหรับหัวข้อทั้งหมด | 
| 2 | public Set partitions() ชุดของพาร์ติชันที่มีข้อมูลในชุดระเบียนนี้ (หากไม่มีการส่งคืนข้อมูลชุดนั้นจะว่างเปล่า) | 
| 3 | public Iterator iterator() Iterator ช่วยให้คุณสามารถวนรอบคอลเลกชันรับหรือย้ายองค์ประกอบได้ | 
| 4 | public List records() รับรายการบันทึกสำหรับพาร์ติชันที่กำหนด | 
การตั้งค่าการกำหนดค่า
การตั้งค่าคอนฟิกูเรชันสำหรับคอนฟิกูเรชันหลักของ Consumer Client API แสดงอยู่ด้านล่าง -
| ส. เลขที่ | การตั้งค่าและคำอธิบาย | 
|---|---|
| 1 | bootstrap.servers Bootstrapping รายชื่อโบรกเกอร์ | 
| 2 | group.id กำหนดผู้บริโภคแต่ละรายให้กับกลุ่ม | 
| 3 | enable.auto.commit เปิดใช้งานการคอมมิตอัตโนมัติสำหรับการชดเชยหากค่าเป็นจริงมิฉะนั้นจะไม่ถูกคอมมิต | 
| 4 | auto.commit.interval.ms ย้อนกลับว่าจะเขียนออฟเซ็ตที่บริโภคที่อัปเดตบ่อยเพียงใดใน ZooKeeper | 
| 5 | session.timeout.ms ระบุจำนวนมิลลิวินาทีที่ Kafka จะรอให้ ZooKeeper ตอบกลับคำขอ (อ่านหรือเขียน) ก่อนที่จะยอมแพ้และใช้ข้อความต่อไป | 
แอปพลิเคชัน SimpleConsumer
ขั้นตอนการสมัครผู้ผลิตยังคงเหมือนเดิมที่นี่ ขั้นแรกเริ่มนายหน้า ZooKeeper และ Kafka จากนั้นสร้างแอปพลิเคชันSimpleConsumer
ด้วยคลาส java ชื่อSimpleCon-sumer.java
และพิมพ์รหัสต่อไปนี้
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}Compilation - สามารถคอมไพล์แอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.javaExecution − แอปพลิเคชันสามารถดำเนินการได้โดยใช้คำสั่งต่อไปนี้
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>Input- เปิดผู้ผลิต CLI และส่งข้อความไปยังหัวข้อ คุณสามารถใส่ smple input เป็น 'Hello Consumer'
Output - ต่อไปนี้จะเป็นผลลัพธ์
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer