Apache Kafka - Ví dụ về nhóm người tiêu dùng

Nhóm người tiêu dùng là nhóm tiêu dùng đa luồng hoặc nhiều máy từ các chủ đề của Kafka.

Nhóm người tiêu dùng

  • Người tiêu dùng có thể tham gia một nhóm bằng cách sử dụng cùng một group.id.

  • Mức độ song song tối đa của một nhóm là số lượng người tiêu dùng trong nhóm ← không có phân vùng.

  • Kafka chỉ định các phân vùng của một chủ đề cho người tiêu dùng trong một nhóm, để mỗi phân vùng được sử dụng bởi chính xác một người tiêu dùng trong nhóm.

  • Kafka đảm bảo rằng một tin nhắn chỉ được đọc bởi một người tiêu dùng duy nhất trong nhóm.

  • Người tiêu dùng có thể xem thông báo theo thứ tự được lưu trong nhật ký.

Tái cân bằng của một người tiêu dùng

Thêm nhiều quy trình / luồng sẽ khiến Kafka cân bằng lại. Nếu bất kỳ người tiêu dùng hoặc nhà môi giới nào không gửi được nhịp tim tới ZooKeeper, thì nó có thể được cấu hình lại thông qua cụm Kafka. Trong quá trình cân bằng lại này, Kafka sẽ gán các phân vùng có sẵn cho các luồng có sẵn, có thể di chuyển một phân vùng sang một quy trình khác.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Tổng hợp

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Chấp hành

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Ở đây chúng tôi đã tạo một tên nhóm mẫu là nhóm của tôi với hai người tiêu dùng. Tương tự, bạn có thể tạo nhóm của mình và số lượng người tiêu dùng trong nhóm.

Đầu vào

Mở CLI của nhà sản xuất và gửi một số tin nhắn như -

Test consumer group 01
Test consumer group 02

Đầu ra của quy trình đầu tiên

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Đầu ra của quy trình thứ hai

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Bây giờ hy vọng bạn đã hiểu SimpleConsumer và ConsumeGroup bằng cách sử dụng bản trình diễn ứng dụng khách Java. Bây giờ bạn có ý tưởng về cách gửi và nhận tin nhắn bằng máy khách Java. Hãy để chúng tôi tiếp tục tích hợp Kafka với công nghệ dữ liệu lớn trong chương tiếp theo.