Configurar e usar o Apache Kafka com Spring Boot

Introdução
O Apache Kafka é uma plataforma de streaming distribuído de código aberto que lida com streaming de dados em tempo real tolerante a falhas e de grande escala. O sistema é baseado em um modelo de publicação-assinatura em que os produtores publicam mensagens em tópicos e os consumidores assinam esses tópicos para consumir as mensagens. O caso de uso do Apache Kafka é visto em cenários como análises em tempo real, arquiteturas orientadas a eventos, agregação de logs, sistemas de mensagens e construção de pipelines de dados escalonáveis.
Uso deste documento:
Você pode usar este documento para configurar e usar o Apache Kafka para produzir e consumir mensagens (conteúdo JSON) em seu aplicativo de inicialização Spring.
Pré-requisitos
Você precisa de um bom entendimento de Java, spring-boot, Apache Kafka, maven ou Gradle antes de passar por este documento, caso contrário, sugerimos que você verifique a documentação e o guia oficiais.
Instalação
Antes de produzir mensagens para os tópicos do Kafka, você deve concluir algumas etapas para configurar as dependências necessárias em seu pom.xml(Maven) ou build.gradle(projeto Gradle)
Para Maven:
For Gradle
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.kafka:spring-kafka’
Configuração
Configurando as propriedades Kafka para o arquivo Application.properties, você também pode ter suas propriedades personalizadas. Especifique os servidores de inicialização do servidor Kafka e quaisquer propriedades de configuração adicionais necessárias, como o ID do grupo de consumidores.
Application.properties
spring.kafka.bootstrap-servers=<kafka-bootstrap-servers>
spring.kafka.consumer.group-id=<consumer-group-id>
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=<topic-name>
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_category
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=my-topic
Um produtor Kafka é um componente que envia mensagens para tópicos Kafka. Ele publica dados no Kafka, que um ou mais consumidores do Kafka podem consumir.
Para criar um produtor Kafka, você precisa executar as seguintes etapas:
- Configurar as propriedades do produtor Kafka: Defina as propriedades de configuração necessárias para o produtor Kafka, como os servidores de bootstrap (endereços de agentes Kafka) e as configurações de serialização. Isso nós já realizamos na última etapa.
- Crie um produtor Kafka: Para enviar mensagens para Kafka; você pode usar o modelo fornecido pelo Spring Kafka. Aqui está um exemplo de um produtor simples:
- Configure as propriedades do produtor Kafka: Configure as propriedades Kafka em seu arquivo application.properties. Especifique os servidores de inicialização do servidor Kafka e quaisquer propriedades de configuração adicionais necessárias para o consumidor.
- Crie um ouvinte consumidor do Kafka: implemente um método em seu aplicativo que será chamado sempre que uma nova mensagem for recebida do Kafka. Use a anotação kafkaListener fornecida pelo Spring Kafka.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaEventProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
logger.info("Producing message [{}]", message);
kafkaTemplate.send(topic, message);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
@KafkaListener(topics = “<topic-name>”, groupId = “<consumer-group-id>”)
public void consumeMessage(String message) {
//Use log
log.info("Consumed message [{}]", message);
}
}
@KafkaListener(topics = “${spring.kafka.app.topic}”,groupId= “${spring.kafka.consumer.group-id}”)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MainApplication {
@Autowired
private KafkaEventProducer kafkaEventProducer;
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
public void YourMethod() {
// Send a message using the Kafka producer
kafkaEventProducer.sendMessage(“<topic-name>”, “Oh Kafka Boy How'z Everything?”);
}
}
Conclusão
Concluindo, o Spring Boot fornece excelente suporte para integrar o Apache Kafka em seus aplicativos. Com o suporte Kafka do Spring Boot, você pode facilmente criar produtores e consumidores Kafka, configurar propriedades Kafka e manipular o processamento de mensagens.
A integração do Spring Boot com o Kafka simplifica o desenvolvimento de aplicativos baseados em Kafka, fornecendo um nível mais alto de abstração e reduzindo a quantidade de código clichê necessário.
E antes de executar o código, verifique se o servidor Kafka está em execução e se os tópicos foram criados.
Nota: Lembre-se de consultar a documentação oficial do Spring Kafka e explorar os exemplos do Spring Kafka para entender melhor os vários recursos e opções disponíveis para a integração do Kafka no Spring Boot.
Obrigado!