Spring Boot での Apache Kafka の構成と使用

May 13 2023
スプリングブーツを履いたカフカ
はじめに Apache Kafka は、大規模で高スループット、フォールトトレラントなリアルタイム データ ストリーミングを処理するオープンソースの分散ストリーミング プラットフォームです。このシステムは、プロデューサーがトピックにメッセージをパブリッシュし、コンシューマーがそれらのトピックをサブスクライブしてメッセージを消費するパブリッシュ/サブスクライブ モデルに基づいています。
画像ソース

序章

Apache Kafka は、大規模で高スループット、フォールトトレラントなリアルタイム データ ストリーミングを処理するオープンソースの分散ストリーミング プラットフォームです。このシステムは、プロデューサーがトピックにメッセージをパブリッシュし、コンシューマーがそれらのトピックをサブスクライブしてメッセージを消費するパブリッシュ/サブスクライブ モデルに基づいています。Apache Kafka のユースケースは、リアルタイム分析、イベント駆動型アーキテクチャ、ログ集約、メッセージング システム、スケーラブルなデータ パイプラインの構築などのシナリオに見られます。

このドキュメントの使用法:

このドキュメントを使用して、Apache Kafka を構成および使用し、Spring Boot アプリケーションでメッセージ (JSON コンテンツ) を生成および消費できます。

前提条件

このドキュメントを読む前に、Java、spring-boot、Apache Kafka、maven、または Gradle について十分に理解する必要があります。そうでない場合は、公式ドキュメントとガイドを確認することをお勧めします。

インストール

Kafka トピックへのメッセージを生成する前に、pom.xml (Maven) または build.gradle (Gradle プロジェクト) で必要な依存関係を構成するためのいくつかの手順を完了する必要があります。

メイブンの場合:

For Gradle 
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

implementation ‘org.springframework.boot:spring-boot-starter’
implementation ‘org.springframework.kafka:spring-kafka’

構成

Kafka プロパティを Application.properties ファイルに構成すると、カスタム プロパティも含めることができます。Kafka サーバーのブートストラップ サーバーと、コンシューマー グループ ID などの必要な追加の構成プロパティを指定します。

アプリケーションのプロパティ

spring.kafka.bootstrap-servers=<kafka-bootstrap-servers> 
spring.kafka.consumer.group-id=<consumer-group-id>
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=<topic-name>


spring.kafka.bootstrap-servers=localhost:9092 
spring.kafka.consumer.group-id=group_category
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.app.topic=my-topic

Kafka プロデューサーは、Kafka トピックにメッセージを送信するコンポーネントです。データを Kafka にパブリッシュし、1 つ以上の Kafka コンシューマーがそのデータを使用できるようにします。

Kafka プロデューサーを作成するには、次の手順を実行する必要があります。

  1. Kafka プロデューサーのプロパティを構成する:ブートストラップ サーバー (Kafka ブローカーのアドレス) やシリアル化設定など、Kafka プロデューサーに必要な構成プロパティをセットアップします。これは最後のステップですでに実行済みです。
  2. Kafka プロデューサーを作成します。Kafkaにメッセージを送信します。Spring Kafka が提供するテンプレートを使用できます。単純なプロデューサーの例を次に示します。
  3. import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class KafkaEventProducer {
     
     private static final Logger logger = LoggerFactory.getLogger(KafkaEventProducer.class);
     
     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;
     
     public void sendMessage(String topic, String message) {
       logger.info("Producing message [{}]", message);
       kafkaTemplate.send(topic, message);
     }
    }
    

  4. Kafka プロデューサーのプロパティを構成する: application.properties ファイルで Kafka プロパティを構成します。Kafka サーバーのブートストラップ サーバーと、コンシューマーに必要な追加の構成プロパティを指定します。
  5. Kafka コンシューマー リスナーを作成する: Kafka から新しいメッセージを受信するたびに呼び出されるメソッドをアプリケーションに実装します。Spring Kafka によって提供される kafkaListener アノテーションを使用します。
  6. import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class KafkaEventConsumer {
    
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventConsumer.class);
    
    @KafkaListener(topics = “<topic-name>”, groupId = “<consumer-group-id>”)
     public void consumeMessage(String message) {
      //Use log
     log.info("Consumed message [{}]", message);
     }
    }
    
    

    @KafkaListener(topics = “${spring.kafka.app.topic}”,groupId= “${spring.kafka.consumer.group-id}”)
    

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MainApplication {

 @Autowired
 private KafkaEventProducer kafkaEventProducer;

 public static void main(String[] args) {
   SpringApplication.run(MainApplication.class, args);
 }
 
 public void YourMethod() {
 // Send a message using the Kafka producer
 kafkaEventProducer.sendMessage(“<topic-name>”, “Oh Kafka Boy How'z Everything?”);
 }
}

結論

結論として、Spring Boot は、Apache Kafka をアプリケーションに統合するための優れたサポートを提供します。Spring Boot の Kafka サポートを使用すると、Kafka プロデューサとコンシューマを簡単に作成し、Kafka プロパティを構成し、メッセージ処理を処理できます。
Spring Boot と Kafka の統合により、Kafka ベースのアプリケーションの開発が簡素化され、より高いレベルの抽象化が提供され、必要な定型コードの量が削減されます。

コードを実行する前に、Kafka サーバーが実行中であり、トピックが作成されていることを確認してください。

注: Spring Kafka の公式ドキュメントを必ず参照し、Spring Kafka サンプルを調べて、Spring Boot での Kafka 統合に利用できるさまざまな機能とオプションをよりよく理解してください。

ありがとう!