Настройка и использование Apache Kafka с Spring Boot

May 13 2023
Кафка с весенним сапогом
Введение Apache Kafka — это платформа распределенной потоковой передачи с открытым исходным кодом, которая обеспечивает крупномасштабную, высокопроизводительную и отказоустойчивую потоковую передачу данных в режиме реального времени. Система основана на модели публикации-подписки, в которой производители публикуют сообщения в темах, а потребители подписываются на эти темы для использования сообщений.
Источник изображения

Введение

Apache Kafka — это распределенная платформа потоковой передачи с открытым исходным кодом, которая обеспечивает крупномасштабную, высокопроизводительную и отказоустойчивую потоковую передачу данных в режиме реального времени. Система основана на модели публикации-подписки, в которой производители публикуют сообщения в темах, а потребители подписываются на эти темы для использования сообщений. Вариант использования Apache Kafka можно увидеть в таких сценариях, как аналитика в реальном времени, архитектуры, управляемые событиями, агрегация журналов, системы обмена сообщениями и создание масштабируемых конвейеров данных.

Использование этого документа:

Вы можете использовать этот документ для настройки и использования Apache Kafka для создания и использования сообщений (контента JSON) в вашем приложении загрузки Spring.

Предпосылки

Вам нужно хорошо понимать Java, spring-boot, Apache Kafka, maven или Gradle, прежде чем читать этот документ, иначе вам будет предложено проверить их официальную документацию и руководство.

Монтаж

Прежде чем создавать сообщения в темах Kafka, вы должны выполнить несколько шагов, чтобы настроить необходимые зависимости в вашем pom.xml (Maven) или build.gradle (проект Gradle).

Для Мавена:

For Gradle 
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.kafka:spring-kafka’

Конфигурация

Настроив свойства Kafka в файле Application.properties, вы также можете иметь свои собственные свойства. Укажите загрузочные серверы сервера Kafka и любые дополнительные свойства конфигурации, которые вам нужны, например идентификатор группы потребителей.

Application.properties

spring.kafka.bootstrap-servers=<kafka-bootstrap-servers> 
spring.kafka.consumer.group-id=<consumer-group-id>
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=<topic-name>


spring.kafka.bootstrap-servers=localhost:9092 
spring.kafka.consumer.group-id=group_category
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=my-topic

Производитель Kafka — это компонент, который отправляет сообщения в темы Kafka. Он публикует данные в Kafka, которые затем могут использовать один или несколько потребителей Kafka.

Чтобы создать производителя Kafka, вам необходимо выполнить следующие шаги:

  1. Настройте свойства производителя Kafka: настройте необходимые свойства конфигурации для производителя Kafka, такие как серверы начальной загрузки (адреса брокеров Kafka) и параметры сериализации. Это мы уже сделали на последнем шаге.
  2. Создайте производителя Kafka: для отправки сообщений в Kafka; вы можете использовать шаблон, предоставленный Spring Kafka. Вот пример простого производителя:
  3. import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class KafkaEventProducer {
     
     private static final Logger logger = LoggerFactory.getLogger(KafkaEventProducer.class);
     
     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;
     
     public void sendMessage(String topic, String message) {
       logger.info("Producing message [{}]", message);
       kafkaTemplate.send(topic, message);
     }
    }
    

  4. Настройте свойства производителя Kafka: Настройте свойства Kafka в файле application.properties. Укажите загрузочные серверы сервера Kafka и любые дополнительные свойства конфигурации, необходимые для потребителя.
  5. Создайте потребительский слушатель Kafka: Реализуйте в своем приложении метод, который будет вызываться всякий раз, когда от Kafka будет получено новое сообщение. Используйте аннотацию kafkaListener, предоставленную Spring Kafka.
  6. import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class KafkaEventConsumer {
    
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
    
    @KafkaListener(topics = “<topic-name>”, groupId = “<consumer-group-id>”)
     public void consumeMessage(String message) {
      //Use log
     log.info("Consumed message [{}]", message);
     }
    }
    
    

    @KafkaListener(topics = “${spring.kafka.app.topic}”,groupId= “${spring.kafka.consumer.group-id}”)
    

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MainApplication {

 @Autowired
 private KafkaEventProducer kafkaEventProducer;

 public static void main(String[] args) {
   SpringApplication.run(MainApplication.class, args);
 }
 
 public void YourMethod() {
 // Send a message using the Kafka producer
 kafkaEventProducer.sendMessage(“<topic-name>”, “Oh Kafka Boy How'z Everything?”);
 }
}

Заключение

В заключение, Spring Boot обеспечивает отличную поддержку для интеграции Apache Kafka в ваши приложения. Благодаря поддержке Spring Boot Kafka вы можете легко создавать производителей и потребителей Kafka, настраивать свойства Kafka и управлять обработкой сообщений.
Интеграция Spring Boot с Kafka упрощает разработку приложений на основе Kafka, обеспечивая более высокий уровень абстракции и уменьшая объем требуемого шаблонного кода.

И перед запуском кода убедитесь, что сервер Kafka запущен и темы созданы.

Примечание. Не забудьте обратиться к официальной документации Spring Kafka и изучить примеры Spring Kafka, чтобы лучше понять различные функции и параметры, доступные для интеграции Kafka в Spring Boot.

Спасибо!