Boot Musim Semi - Apache Kafka
Apache Kafka adalah proyek sumber terbuka yang digunakan untuk menerbitkan dan berlangganan pesan berdasarkan sistem pesan yang toleran terhadap kesalahan. Ini cepat, terukur, dan didistribusikan berdasarkan desain. Jika Anda seorang pemula di Kafka, atau ingin mendapatkan pemahaman yang lebih baik tentangnya, silakan merujuk ke tautan ini - www.tutorialspoint.com/apache_kafka/
Pada bab ini, kita akan melihat bagaimana mengimplementasikan Apache Kafka di aplikasi Spring Boot.
Pertama, kita perlu menambahkan dependensi Spring Kafka di file konfigurasi build kita.
Pengguna Maven dapat menambahkan dependensi berikut di file pom.xml.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
Pengguna Gradle dapat menambahkan dependensi berikut dalam file build.gradle.
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.0.RELEASE'
Memproduksi Pesan
Untuk menghasilkan pesan ke Apache Kafka, kita perlu mendefinisikan kelas Konfigurasi untuk konfigurasi Producer seperti yang ditunjukkan -
package com.tutorialspoint.kafkademo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Untuk mempublikasikan pesan, kawat otomatis objek Template Kafka dan buat pesan seperti yang ditunjukkan.
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
Mengkonsumsi Pesan
Untuk menggunakan pesan, kita perlu menulis file kelas konfigurasi Konsumen seperti yang ditunjukkan di bawah ini.
package com.tutorialspoint.kafkademo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Selanjutnya, tulis Pendengar untuk mendengarkan pesan.
@KafkaListener(topics = "tutorialspoint", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Messasge in group - group-id: " + message);
}
Mari kita panggil metode sendMessage () dari metode menjalankan kelas ApplicationRunner dari file kelas aplikasi Spring Boot utama dan mengonsumsi pesan dari file kelas yang sama.
Kode file kelas aplikasi Spring Boot utama Anda diberikan di bawah ini -
package com.tutorialspoint.kafkademo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class KafkaDemoApplication implements ApplicationRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send("tutorialspoint", msg);
}
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@KafkaListener(topics = "tutorialspoint", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Messasge in group - group-id: " + message);
}
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessage("Hi Welcome to Spring For Apache Kafka");
}
}
Kode untuk file konfigurasi build lengkap diberikan di bawah ini.
Maven – pom.xml
<?xml version = "1.0" encoding = "UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorialspoint</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Gradle – build.gradle
buildscript {
ext {
springBootVersion = '1.5.9.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
group = 'com.tutorialspoint'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.0.RELEASE'
testCompile('org.springframework.boot:spring-boot-starter-test')
}
Sekarang, buat file JAR yang dapat dieksekusi, dan jalankan aplikasi Spring Boot dengan menggunakan perintah Maven atau Gradle di bawah ini seperti yang ditunjukkan -
Untuk Maven, gunakan perintah seperti yang ditunjukkan -
mvn clean install
Setelah “BUILD SUCCESS”, Anda dapat menemukan file JAR di bawah direktori target.
Untuk Gradle, gunakan perintah seperti yang ditunjukkan -
gradle clean build
Setelah "BUILD SUCCESSFUL", Anda dapat menemukan file JAR di bawah direktori build / libs.
Jalankan file JAR dengan menggunakan perintah yang diberikan di sini -
java –jar <JARFILE>
Anda dapat melihat hasilnya di jendela konsol.