Apache Kafka - Exemplo de produtor simples

Vamos criar um aplicativo para publicar e consumir mensagens usando um cliente Java. O cliente produtor Kafka consiste nas seguintes APIs.

API KafkaProducer

Vamos entender o conjunto mais importante de API do produtor Kafka nesta seção. A parte central da API KafkaProducer é a classe KafkaProducer . A classe KafkaProducer fornece uma opção para conectar um broker Kafka em seu construtor com os métodos a seguir.

  • A classe KafkaProducer fornece o método send para enviar mensagens de forma assíncrona para um tópico. A assinatura de send () é a seguinte

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - O produtor gerencia um buffer de registros aguardando para serem enviados.

  • Callback - Um retorno de chamada fornecido pelo usuário a ser executado quando o registro tiver sido reconhecido pelo servidor (nulo indica nenhum retorno de chamada).

  • A classe KafkaProducer fornece um método flush para garantir que todas as mensagens enviadas anteriormente foram realmente concluídas. A sintaxe do método flush é a seguinte -

public void flush()
  • A classe KafkaProducer fornece o método partitionFor, que ajuda a obter os metadados da partição para um determinado tópico. Isso pode ser usado para particionamento personalizado. A assinatura deste método é a seguinte -

public Map metrics()

Ele retorna o mapa de métricas internas mantidas pelo produtor.

  • public void close () - A classe KafkaProducer fornece blocos de método de fechamento até que todas as solicitações enviadas anteriormente sejam concluídas.

API do produtor

A parte central da API do Produtor é a classe Produtor . A classe do produtor fornece uma opção para conectar o broker Kafka em seu construtor pelos métodos a seguir.

A classe do produtor

A classe produtora fornece método de envio para send mensagens para um ou vários tópicos usando as seguintes assinaturas.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Existem dois tipos de produtores - Sync e Async.

A mesma configuração de API se aplica ao produtor de sincronização também. A diferença entre eles é que um produtor de sincronização envia mensagens diretamente, mas envia mensagens em segundo plano. O produtor assíncrono é preferível quando você deseja um rendimento mais alto. Nas versões anteriores como 0.8, um produtor assíncrono não tem um retorno de chamada para send () para registrar manipuladores de erro. Isso está disponível apenas na versão atual do 0.9.

public void close ()

A classe de produtor fornece close método para fechar as conexões do conjunto de produtores para todos os bro-kers Kafka.

Definições de configuração

As principais definições de configuração da API do Produtor estão listadas na tabela a seguir para melhor compreensão -

S.Não Definições de configuração e descrição
1

client.id

identifica a aplicação do produtor

2

producer.type

seja sincronizado ou assíncrono

3

acks

A configuração de acks controla os critérios em que as solicitações do produtor são consideradas concluídas.

4

retries

Se a solicitação do produtor falhar, tente novamente automaticamente com um valor específico.

5

bootstrap.servers

lista de bootstrapping de corretores.

6

linger.ms

se você deseja reduzir o número de solicitações, pode definir linger.ms para algo maior do que algum valor.

7

key.serializer

Chave para a interface do serializador.

8

value.serializer

valor para a interface do serializador.

9

batch.size

Tamanho do buffer.

10

buffer.memory

controla a quantidade total de memória disponível para o produtor para buffer.

API ProducerRecord

ProducerRecord é um par de chave / valor enviado ao construtor da classe Kafka cluster.ProducerRecord para criar um registro com pares de partição, chave e valor usando a assinatura a seguir.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - nome do tópico definido pelo usuário que será anexado ao registro.

  • Partition - contagem de partição

  • Key - A chave que será incluída no registro.

  • Value - Gravar conteúdos
public ProducerRecord (string topic, k key, v value)

O construtor da classe ProducerRecord é usado para criar um registro com chave, pares de valor e sem partição.

  • Topic - Crie um tópico para atribuir registro.

  • Key - chave para o registro.

  • Value - conteúdo do registro.

public ProducerRecord (string topic, v value)

A classe ProducerRecord cria um registro sem partição e chave.

  • Topic - crie um tópico.

  • Value - conteúdo do registro.

Os métodos da classe ProducerRecord estão listados na tabela a seguir -

S.Não Métodos de classe e descrição
1

public string topic()

O tópico será anexado ao registro.

2

public K key()

Chave que será incluída no registro. Se não houver essa chave, null será devolvido aqui.

3

public V value()

Conteúdo do registro.

4

partition()

Contagem de partição para o registro

Aplicativo SimpleProducer

Antes de criar o aplicativo, primeiro inicie o ZooKeeper e o corretor Kafka e, em seguida, crie seu próprio tópico no corretor Kafka usando o comando criar tópico. Depois disso, crie uma classe java chamada Sim-pleProducer.java e digite a seguinte codificação.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - O aplicativo pode ser compilado usando o seguinte comando.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - O aplicativo pode ser executado usando o seguinte comando.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Exemplo de consumidor simples

A partir de agora, criamos um produtor para enviar mensagens ao cluster Kafka. Agora, vamos criar um consumidor para consumir mensagens do cluster Kafka. A API KafkaConsumer é usada para consumir mensagens do cluster Kafka. O construtor da classe KafkaConsumer é definido a seguir.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Retorne um mapa de configurações do consumidor.

A classe KafkaConsumer tem os seguintes métodos significativos listados na tabela a seguir.

S.Não Método e Descrição
1

public java.util.Set<TopicPar-tition> assignment()

Obtenha o conjunto de partições atualmente atribuídas pelo consumidor.

2

public string subscription()

Inscreva-se na lista de tópicos fornecida para obter partições atribuídas dinamicamente.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

Inscreva-se na lista de tópicos fornecida para obter partições atribuídas dinamicamente.

4

public void unsubscribe()

Cancele a assinatura dos tópicos da lista de partições fornecida.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

Inscreva-se na lista de tópicos fornecida para obter partições atribuídas dinamicamente. Se a lista de tópicos fornecida estiver vazia, ela será tratada da mesma forma que unsubscribe ().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

O padrão de argumento refere-se ao padrão de assinatura no formato de expressão regular e o argumento do listener obtém notificações do padrão de assinatura.

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

Atribua manualmente uma lista de partições ao cliente.

8

poll()

Busque dados para os tópicos ou partições especificados usando uma das APIs de inscrição / atribuição. Isso retornará um erro, se os tópicos não forem inscritos antes da pesquisa de dados.

9

public void commitSync()

Comprometer offsets retornados na última enquete () para toda a lista de tópicos e partições subscritas. A mesma operação é aplicada a commitAsyn ().

10

public void seek(TopicPartition partition, long offset)

Busque o valor de deslocamento atual que o consumidor usará no próximo método poll ().

11

public void resume()

Retome as partições pausadas.

12

public void wakeup()

Desperte o consumidor.

API ConsumerRecord

A API ConsumerRecord é usada para receber registros do cluster Kafka. Esta API consiste em um nome de tópico, número de partição, a partir do qual o registro está sendo recebido e um deslocamento que aponta para o registro em uma partição Kafka. A classe ConsumerRecord é usada para criar um registro de consumidor com nome de tópico específico, contagem de partição e pares <chave, valor>. Possui a seguinte assinatura.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - O nome do tópico para registro do consumidor recebido do cluster Kafka.

  • Partition - Partição para o tópico.

  • Key - A chave do registro, se nenhuma chave existir, será retornado nulo.

  • Value - Grave conteúdos.

API ConsumerRecords

A API ConsumerRecords atua como um contêiner para ConsumerRecord. Esta API é usada para manter a lista de ConsumerRecord por partição para um tópico específico. Seu construtor é definido a seguir.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Retorne um mapa de partição para um tópico específico.

  • Records - Lista de retorno de ConsumerRecord.

A classe ConsumerRecords possui os seguintes métodos definidos.

S.Não Métodos e Descrição
1

public int count()

O número de registros para todos os tópicos.

2

public Set partitions()

O conjunto de partições com dados neste conjunto de registros (se nenhum dado foi retornado, o conjunto está vazio).

3

public Iterator iterator()

Iterator permite que você percorra uma coleção, obtendo ou removendo elementos.

4

public List records()

Obtenha uma lista de registros para a partição fornecida.

Definições de configuração

As definições de configuração para as principais definições de configuração da API do cliente consumidor estão listadas abaixo -

S.Não Configurações e descrição
1

bootstrap.servers

Lista de bootstrapping de corretores.

2

group.id

Atribui um consumidor individual a um grupo.

3

enable.auto.commit

Habilite a confirmação automática para deslocamentos se o valor for verdadeiro, caso contrário, não confirmada.

4

auto.commit.interval.ms

Retorne com que frequência os deslocamentos consumidos atualizados são gravados no ZooKeeper.

5

session.timeout.ms

Indica quantos milissegundos o Kafka aguardará até que o ZooKeeper responda a uma solicitação (leitura ou gravação) antes de desistir e continuar a consumir mensagens.

Aplicativo SimpleConsumer

As etapas do aplicativo do produtor permanecem as mesmas aqui. Primeiro, inicie seu corretor ZooKeeper e Kafka. Em seguida, crie um aplicativo SimpleConsumer com a classe java chamada SimpleCon-sumer.java e digite o código a seguir.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - O aplicativo pode ser compilado usando o seguinte comando.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − O aplicativo pode ser executado usando o seguinte comando

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- Abra a CLI do produtor e envie algumas mensagens para o tópico. Você pode colocar a entrada smple como 'Olá, consumidor'.

Output - A seguir será a saída.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer