Skonfiguruj i używaj Apache Kafka z Spring Boot

May 13 2023
Kafka z wiosennym butem
Wprowadzenie Apache Kafka to rozproszona platforma przesyłania strumieniowego typu open source, która obsługuje przesyłanie strumieniowe danych w czasie rzeczywistym na dużą skalę, o dużej przepustowości i odporności na błędy. System opiera się na modelu publikowania-subskrybowania, w którym producenci publikują wiadomości w tematach, a konsumenci subskrybują te tematy, aby konsumować wiadomości.
Źródło obrazu

Wstęp

Apache Kafka to rozproszona platforma przesyłania strumieniowego typu open source, która obsługuje przesyłanie strumieniowe danych w czasie rzeczywistym o dużej przepustowości i odporności na błędy. System opiera się na modelu publikowania-subskrybowania, w którym producenci publikują wiadomości w tematach, a konsumenci subskrybują te tematy, aby konsumować wiadomości. Przypadek użycia Apache Kafka jest widoczny w scenariuszach takich jak analityka w czasie rzeczywistym, architektury sterowane zdarzeniami, agregacja logów, systemy przesyłania wiadomości i budowanie skalowalnych potoków danych.

Użycie tego dokumentu:

Możesz użyć tego dokumentu, aby skonfigurować i używać Apache Kafka do tworzenia i używania komunikatów (zawartość JSON) w aplikacji Spring Boot.

Wymagania wstępne

Zanim przejdziesz do tego dokumentu, potrzebujesz dobrej znajomości Javy, spring-boot, Apache Kafka, maven lub Gradle, w przeciwnym razie zasugerujemy sprawdzenie ich oficjalnej dokumentacji i przewodnika.

Instalacja

Przed utworzeniem komunikatów do tematów Kafki należy wykonać kilka kroków, aby skonfigurować wymagane zależności w pom.xml (Maven) lub build.gradle (projekt Gradle)

Dla Mavena:

For Gradle 
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.kafka:spring-kafka’

Konfiguracja

Konfigurując właściwości Kafki w pliku Application.properties, możesz mieć również własne właściwości. Określ serwery ładowania początkowego serwera Kafka i wszelkie dodatkowe potrzebne właściwości konfiguracyjne, takie jak identyfikator grupy konsumentów.

Właściwości.aplikacji

spring.kafka.bootstrap-servers=<kafka-bootstrap-servers> 
spring.kafka.consumer.group-id=<consumer-group-id>
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=<topic-name>


spring.kafka.bootstrap-servers=localhost:9092 
spring.kafka.consumer.group-id=group_category
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=my-topic

Producent Kafki to komponent, który wysyła komunikaty do tematów Kafki. Publikuje dane w Kafce, które następnie może wykorzystać jeden lub więcej konsumentów Kafki.

Aby utworzyć producenta Kafki, musisz wykonać następujące kroki:

  1. Skonfiguruj właściwości producenta Kafki: Skonfiguruj niezbędne właściwości konfiguracyjne dla producenta Kafki, takie jak serwery ładowania początkowego (adresy brokerów Kafki) i ustawienia serializacji. To już wykonaliśmy w ostatnim kroku.
  2. Utwórz producenta Kafki: Aby wysyłać wiadomości do Kafki; możesz skorzystać z szablonu udostępnionego przez Spring Kafka. Oto przykład prostego producenta:
  3. import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class KafkaEventProducer {
     
     private static final Logger logger = LoggerFactory.getLogger(KafkaEventProducer.class);
     
     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;
     
     public void sendMessage(String topic, String message) {
       logger.info("Producing message [{}]", message);
       kafkaTemplate.send(topic, message);
     }
    }
    

  4. Skonfiguruj właściwości producenta platformy Kafka: Skonfiguruj właściwości platformy Kafka w pliku application.properties. Określ serwery ładowania początkowego serwera Kafka i wszelkie dodatkowe właściwości konfiguracyjne potrzebne konsumentowi.
  5. Stwórz detektor konsumenta Kafki: Zaimplementuj w swojej aplikacji metodę, która będzie wywoływana za każdym razem, gdy od Kafki zostanie odebrana nowa wiadomość. Użyj adnotacji kafkaListener dostarczonej przez Spring Kafka.
  6. import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class KafkaEventConsumer {
    
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
    
    @KafkaListener(topics = “<topic-name>”, groupId = “<consumer-group-id>”)
     public void consumeMessage(String message) {
      //Use log
     log.info("Consumed message [{}]", message);
     }
    }
    
    

    @KafkaListener(topics = “${spring.kafka.app.topic}”,groupId= “${spring.kafka.consumer.group-id}”)
    

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MainApplication {

 @Autowired
 private KafkaEventProducer kafkaEventProducer;

 public static void main(String[] args) {
   SpringApplication.run(MainApplication.class, args);
 }
 
 public void YourMethod() {
 // Send a message using the Kafka producer
 kafkaEventProducer.sendMessage(“<topic-name>”, “Oh Kafka Boy How'z Everything?”);
 }
}

Wniosek

Podsumowując, Spring Boot zapewnia doskonałe wsparcie dla integracji Apache Kafka z twoimi aplikacjami. Dzięki obsłudze Kafka w Spring Boot możesz łatwo tworzyć producentów i konsumentów Kafka, konfigurować właściwości Kafki i obsługiwać przetwarzanie komunikatów.
Integracja Spring Boot z Kafką upraszcza tworzenie aplikacji opartych na Kafce, zapewniając wyższy poziom abstrakcji i zmniejszając ilość wymaganego kodu szablonowego.

A przed uruchomieniem kodu upewnij się, że serwer Kafki jest uruchomiony, a tematy zostały utworzone.

Uwaga: Pamiętaj, aby zapoznać się z oficjalną dokumentacją Spring Kafka i zapoznać się z przykładami Spring Kafka, aby lepiej zrozumieć różne funkcje i opcje dostępne dla integracji Kafka w Spring Boot.

Dziękuję!