Apache Kafka - Ví dụ về nhóm người tiêu dùng
Nhóm người tiêu dùng là nhóm tiêu dùng đa luồng hoặc nhiều máy từ các chủ đề của Kafka.
Nhóm người tiêu dùng
Người tiêu dùng có thể tham gia một nhóm bằng cách sử dụng cùng một
group.id.
Mức độ song song tối đa của một nhóm là số lượng người tiêu dùng trong nhóm ← không có phân vùng.
Kafka chỉ định các phân vùng của một chủ đề cho người tiêu dùng trong một nhóm, để mỗi phân vùng được sử dụng bởi chính xác một người tiêu dùng trong nhóm.
Kafka đảm bảo rằng một tin nhắn chỉ được đọc bởi một người tiêu dùng duy nhất trong nhóm.
Người tiêu dùng có thể xem thông báo theo thứ tự được lưu trong nhật ký.
Tái cân bằng của một người tiêu dùng
Thêm nhiều quy trình / luồng sẽ khiến Kafka cân bằng lại. Nếu bất kỳ người tiêu dùng hoặc nhà môi giới nào không gửi được nhịp tim tới ZooKeeper, thì nó có thể được cấu hình lại thông qua cụm Kafka. Trong quá trình cân bằng lại này, Kafka sẽ gán các phân vùng có sẵn cho các luồng có sẵn, có thể di chuyển một phân vùng sang một quy trình khác.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Tổng hợp
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
Chấp hành
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
Ở đây chúng tôi đã tạo một tên nhóm mẫu là nhóm của tôi
với hai người tiêu dùng. Tương tự, bạn có thể tạo nhóm của mình và số lượng người tiêu dùng trong nhóm.
Đầu vào
Mở CLI của nhà sản xuất và gửi một số tin nhắn như -
Test consumer group 01
Test consumer group 02
Đầu ra của quy trình đầu tiên
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
Đầu ra của quy trình thứ hai
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
Bây giờ hy vọng bạn đã hiểu SimpleConsumer và ConsumeGroup bằng cách sử dụng bản trình diễn ứng dụng khách Java. Bây giờ bạn có ý tưởng về cách gửi và nhận tin nhắn bằng máy khách Java. Hãy để chúng tôi tiếp tục tích hợp Kafka với công nghệ dữ liệu lớn trong chương tiếp theo.