स्प्रिंग बूट - अपाचे काफ्का
अपाचे काफ्का एक ओपन सोर्स प्रोजेक्ट है जिसका इस्तेमाल गलती-सहिष्णु मैसेजिंग सिस्टम पर आधारित संदेशों को प्रकाशित और सब्सक्राइब करने के लिए किया जाता है। यह तेज, स्केलेबल है और डिजाइन द्वारा वितरित किया गया है। यदि आप काफ्का के लिए एक शुरुआत कर रहे हैं, या इस पर एक बेहतर समझ हासिल करना चाहते हैं, तो कृपया इस लिंक को देखें - www.tutorialspoint.com/apache_kafka/
इस अध्याय में, हम यह देखने जा रहे हैं कि स्प्रिंग बूट एप्लिकेशन में अपाचे काफ्का को कैसे लागू किया जाए।
सबसे पहले, हमें अपनी बिल्ड कॉन्फ़िगरेशन फ़ाइल में स्प्रिंग काफ्का निर्भरता जोड़ने की आवश्यकता है।
Maven उपयोगकर्ता pom.xml फ़ाइल में निम्न निर्भरता जोड़ सकते हैं।
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
Gradle उपयोगकर्ता build.gradle फ़ाइल में निम्न निर्भरता जोड़ सकते हैं।
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.0.RELEASE'
संदेश उत्पन्न करना
अपाचे काफ्का में संदेशों का उत्पादन करने के लिए, हमें निर्माता विन्यास के लिए विन्यास वर्ग को परिभाषित करने की आवश्यकता है -
package com.tutorialspoint.kafkademo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
एक संदेश प्रकाशित करने के लिए, काफ्का टेम्पलेट ऑब्जेक्ट को ऑटो तार करें और दिखाए गए अनुसार संदेश का उत्पादन करें।
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
एक संदेश का उपभोग करना
संदेशों का उपभोग करने के लिए, हमें नीचे दिखाए गए अनुसार एक उपभोक्ता कॉन्फ़िगरेशन क्लास फ़ाइल लिखने की आवश्यकता है।
package com.tutorialspoint.kafkademo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
अगला, संदेशों को सुनने के लिए एक श्रोता लिखें।
@KafkaListener(topics = "tutorialspoint", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Messasge in group - group-id: " + message);
}
हमें मुख्य स्प्रिंग बूट एप्लिकेशन क्लास फ़ाइल से ApplicationRunner वर्ग रन विधि से sendMessage () विधि को कॉल करें और उसी वर्ग फ़ाइल से संदेश का उपभोग करें।
आपका मुख्य स्प्रिंग बूट एप्लिकेशन क्लास फ़ाइल कोड नीचे दिया गया है -
package com.tutorialspoint.kafkademo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class KafkaDemoApplication implements ApplicationRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send("tutorialspoint", msg);
}
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@KafkaListener(topics = "tutorialspoint", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Messasge in group - group-id: " + message);
}
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessage("Hi Welcome to Spring For Apache Kafka");
}
}
पूर्ण बिल्ड कॉन्फ़िगरेशन फ़ाइल के लिए कोड नीचे दिया गया है।
Maven – pom.xml
<?xml version = "1.0" encoding = "UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorialspoint</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Gradle – build.gradle
buildscript {
ext {
springBootVersion = '1.5.9.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
group = 'com.tutorialspoint'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.0.RELEASE'
testCompile('org.springframework.boot:spring-boot-starter-test')
}
अब, एक निष्पादन योग्य JAR फ़ाइल बनाएँ, और नीचे दिए गए मावेन या ग्रैडल कमांड का उपयोग करके स्प्रिंग बूट एप्लिकेशन चलाएं -
मावेन के लिए, दिखाए गए अनुसार कमांड का उपयोग करें -
mvn clean install
"बिल्ड सफलता" के बाद, आप लक्ष्य निर्देशिका के तहत जार फ़ाइल पा सकते हैं।
ग्रेडल के लिए, कमांड को दिखाए अनुसार उपयोग करें -
gradle clean build
“BUILD SUCCESSFUL” के बाद, आप JAR फाइल को बिल्ड / लिबास डायरेक्टरी के तहत पा सकते हैं।
यहां दिए गए कमांड का उपयोग करके JAR फ़ाइल चलाएँ -
java –jar <JARFILE>
आप कंसोल विंडो में आउटपुट देख सकते हैं।