Apache Kafka - Exemplo de grupo de consumidores

Grupo de consumidores é um consumo multi-threaded ou multi-máquina de tópicos Kafka.

Grupo de Consumidores

  • Os consumidores podem ingressar em um grupo usando o mesmo group.id.

  • O paralelismo máximo de um grupo é que o número de consumidores no grupo ← não de partições.

  • O Kafka atribui as partições de um tópico ao consumidor em um grupo, de modo que cada partição seja consumida por exatamente um consumidor no grupo.

  • Kafka garante que uma mensagem só é lida por um único consumidor no grupo.

  • Os consumidores podem ver a mensagem na ordem em que foram armazenadas no log.

Reequilíbrio de um consumidor

Adicionar mais processos / threads fará com que o Kafka se reequilibre. Se algum consumidor ou corretor falhar em enviar pulsação ao ZooKeeper, ele poderá ser reconfigurado por meio do cluster Kafka. Durante esse reequilíbrio, o Kafka atribuirá as partições disponíveis aos threads disponíveis, possivelmente movendo uma partição para outro processo.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Compilação

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Execução

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Aqui, criamos um nome de grupo de amostra como my-group com dois consumidores. Da mesma forma, você pode criar seu grupo e o número de consumidores no grupo.

Entrada

Abra a CLI do produtor e envie algumas mensagens como -

Test consumer group 01
Test consumer group 02

Saída do Primeiro Processo

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Saída do segundo processo

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Esperamos que você tenha entendido SimpleConsumer e ConsumeGroup usando a demonstração do cliente Java. Agora você tem uma ideia sobre como enviar e receber mensagens usando um cliente Java. Vamos continuar a integração do Kafka com tecnologias de big data no próximo capítulo.