Настройка и использование Apache Kafka с Spring Boot
Введение
Apache Kafka — это распределенная платформа потоковой передачи с открытым исходным кодом, которая обеспечивает крупномасштабную, высокопроизводительную и отказоустойчивую потоковую передачу данных в режиме реального времени. Система основана на модели публикации-подписки, в которой производители публикуют сообщения в темах, а потребители подписываются на эти темы для использования сообщений. Вариант использования Apache Kafka можно увидеть в таких сценариях, как аналитика в реальном времени, архитектуры, управляемые событиями, агрегация журналов, системы обмена сообщениями и создание масштабируемых конвейеров данных.
Использование этого документа:
Вы можете использовать этот документ для настройки и использования Apache Kafka для создания и использования сообщений (контента JSON) в вашем приложении загрузки Spring.
Предпосылки
Вам нужно хорошо понимать Java, spring-boot, Apache Kafka, maven или Gradle, прежде чем читать этот документ, иначе вам будет предложено проверить их официальную документацию и руководство.
Монтаж
Прежде чем создавать сообщения в темах Kafka, вы должны выполнить несколько шагов, чтобы настроить необходимые зависимости в вашем pom.xml (Maven) или build.gradle (проект Gradle).
Для Мавена:
For Gradle
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.kafka:spring-kafka’
Конфигурация
Настроив свойства Kafka в файле Application.properties, вы также можете иметь свои собственные свойства. Укажите загрузочные серверы сервера Kafka и любые дополнительные свойства конфигурации, которые вам нужны, например идентификатор группы потребителей.
Application.properties
spring.kafka.bootstrap-servers=<kafka-bootstrap-servers>
spring.kafka.consumer.group-id=<consumer-group-id>
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=<topic-name>
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_category
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=my-topic
Производитель Kafka — это компонент, который отправляет сообщения в темы Kafka. Он публикует данные в Kafka, которые затем могут использовать один или несколько потребителей Kafka.
Чтобы создать производителя Kafka, вам необходимо выполнить следующие шаги:
- Настройте свойства производителя Kafka: настройте необходимые свойства конфигурации для производителя Kafka, такие как серверы начальной загрузки (адреса брокеров Kafka) и параметры сериализации. Это мы уже сделали на последнем шаге.
- Создайте производителя Kafka: для отправки сообщений в Kafka; вы можете использовать шаблон, предоставленный Spring Kafka. Вот пример простого производителя:
- Настройте свойства производителя Kafka: Настройте свойства Kafka в файле application.properties. Укажите загрузочные серверы сервера Kafka и любые дополнительные свойства конфигурации, необходимые для потребителя.
- Создайте потребительский слушатель Kafka: Реализуйте в своем приложении метод, который будет вызываться всякий раз, когда от Kafka будет получено новое сообщение. Используйте аннотацию kafkaListener, предоставленную Spring Kafka.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaEventProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
logger.info("Producing message [{}]", message);
kafkaTemplate.send(topic, message);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
@KafkaListener(topics = “<topic-name>”, groupId = “<consumer-group-id>”)
public void consumeMessage(String message) {
//Use log
log.info("Consumed message [{}]", message);
}
}
@KafkaListener(topics = “${spring.kafka.app.topic}”,groupId= “${spring.kafka.consumer.group-id}”)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MainApplication {
@Autowired
private KafkaEventProducer kafkaEventProducer;
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
public void YourMethod() {
// Send a message using the Kafka producer
kafkaEventProducer.sendMessage(“<topic-name>”, “Oh Kafka Boy How'z Everything?”);
}
}
Заключение
В заключение, Spring Boot обеспечивает отличную поддержку для интеграции Apache Kafka в ваши приложения. Благодаря поддержке Spring Boot Kafka вы можете легко создавать производителей и потребителей Kafka, настраивать свойства Kafka и управлять обработкой сообщений.
Интеграция Spring Boot с Kafka упрощает разработку приложений на основе Kafka, обеспечивая более высокий уровень абстракции и уменьшая объем требуемого шаблонного кода.
И перед запуском кода убедитесь, что сервер Kafka запущен и темы созданы.
Примечание. Не забудьте обратиться к официальной документации Spring Kafka и изучить примеры Spring Kafka, чтобы лучше понять различные функции и параметры, доступные для интеграции Kafka в Spring Boot.
Спасибо!