Apache Kafka - Contoh Produser Sederhana

Mari kita buat aplikasi untuk menerbitkan dan menggunakan pesan menggunakan klien Java. Klien produser Kafka terdiri dari API berikut.

API KafkaProducer

Mari kita pahami kumpulan API produsen Kafka yang paling penting di bagian ini. Bagian utama dari KafkaProducer API adalah kelas KafkaProducer . Kelas KafkaProducer menyediakan opsi untuk menghubungkan broker Kafka dalam konstruktornya dengan metode berikut.

  • Kelas KafkaProducer menyediakan metode kirim untuk mengirim pesan secara asinkron ke suatu topik. Tanda tangan send () adalah sebagai berikut

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - Produser mengelola penyangga rekaman yang menunggu untuk dikirim.

  • Callback - Callback yang disediakan pengguna untuk dieksekusi ketika record telah diakui oleh server (null menunjukkan tidak ada callback).

  • Kelas KafkaProducer menyediakan metode flush untuk memastikan semua pesan yang dikirim sebelumnya telah benar-benar diselesaikan. Sintaks dari metode flush adalah sebagai berikut -

public void flush()
  • Kelas KafkaProducer menyediakan metode partitionFor, yang membantu mendapatkan metadata partisi untuk topik tertentu. Ini dapat digunakan untuk partisi kustom. Tanda tangan metode ini adalah sebagai berikut -

public Map metrics()

Ini mengembalikan peta metrik internal yang dikelola oleh produsen.

  • public void close () - kelas KafkaProducer menyediakan blok metode tertutup sampai semua permintaan yang dikirim sebelumnya selesai.

API Produsen

Bagian utama dari API Produser adalah kelas Produser . Kelas produsen menyediakan opsi untuk menghubungkan broker Kafka di konstruktornya dengan metode berikut.

Kelas Produser

Kelas produser menyediakan metode kirim ke send pesan ke satu atau beberapa topik menggunakan tanda tangan berikut.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Ada dua jenis produsen - Sync dan Async.

Konfigurasi API yang sama juga berlaku untuk produsen sinkronisasi . Perbedaan di antara keduanya adalah produsen sinkronisasi mengirim pesan secara langsung, tetapi mengirim pesan di latar belakang. Produser asinkron lebih disukai jika Anda menginginkan throughput yang lebih tinggi. Dalam rilis sebelumnya seperti 0.8, produser asinkron tidak memiliki callback untuk send () untuk mendaftarkan penangan kesalahan. Ini hanya tersedia di rilis saat ini 0,9.

public void close ()

Kelas produser menyediakan close metode untuk menutup koneksi kumpulan produsen ke semua Kafka bro-kers.

Pengaturan konfigurasi

Pengaturan konfigurasi utama Producer API tercantum dalam tabel berikut untuk pemahaman yang lebih baik -

S.No Pengaturan dan Deskripsi Konfigurasi
1

client.id

mengidentifikasi aplikasi produsen

2

producer.type

baik sinkronisasi atau asinkron

3

acks

Konfigurasi acks mengontrol kriteria berdasarkan permintaan produsen yang dianggap lengkap.

4

retries

Jika permintaan produsen gagal, maka secara otomatis coba lagi dengan nilai tertentu.

5

bootstrap.servers

daftar bootstrap pialang.

6

linger.ms

jika Anda ingin mengurangi jumlah permintaan, Anda dapat mengatur linger.ms menjadi sesuatu yang lebih besar dari nilai tertentu.

7

key.serializer

Kunci untuk antarmuka serializer.

8

value.serializer

nilai untuk antarmuka serializer.

9

batch.size

Ukuran buffer.

10

buffer.memory

mengontrol jumlah total memori yang tersedia bagi produsen untuk buff-ering.

ProducerRecord API

ProducerRecord adalah pasangan kunci / nilai yang dikirim ke kluster Kafka. Konstruktor kelas ProducerRecord untuk membuat rekaman dengan pasangan partisi, kunci, dan nilai menggunakan tanda tangan berikut.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - nama topik yang ditentukan pengguna yang akan ditambahkan ke rekaman.

  • Partition - jumlah partisi

  • Key - Kunci yang akan dimasukkan ke dalam rekaman.

  • Value - Rekam isinya
public ProducerRecord (string topic, k key, v value)

Konstruktor kelas ProducerRecord digunakan untuk membuat record dengan kunci, pasangan nilai, dan tanpa partisi.

  • Topic - Buat topik untuk menetapkan catatan.

  • Key - kunci catatan.

  • Value - merekam konten.

public ProducerRecord (string topic, v value)

Kelas ProducerRecord membuat record tanpa partisi dan kunci.

  • Topic - buat topik.

  • Value - merekam konten.

Metode kelas ProducerRecord tercantum dalam tabel berikut -

S.No Metode dan Deskripsi Kelas
1

public string topic()

Topik akan ditambahkan ke rekaman.

2

public K key()

Kunci yang akan dimasukkan ke dalam catatan. Jika tidak ada kunci seperti itu, nol akan diaktifkan kembali di sini.

3

public V value()

Rekam isinya.

4

partition()

Hitungan partisi untuk catatan

Aplikasi SimpleProducer

Sebelum membuat aplikasi, pertama mulai Zookeeper dan broker Kafka kemudian buat topik Anda sendiri di broker Kafka menggunakan perintah buat topik. Setelah itu buat kelas java bernama Sim-pleProducer.java dan ketikkan koding berikut.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - Aplikasi dapat dikompilasi menggunakan perintah berikut.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - Aplikasi dapat dijalankan dengan menggunakan perintah berikut.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Contoh Konsumen Sederhana

Sampai sekarang kami telah membuat produser untuk mengirim pesan ke cluster Kafka. Sekarang mari kita buat konsumen untuk mengkonsumsi pesan dari cluster Kafka. KafkaConsumer API digunakan untuk menggunakan pesan dari cluster Kafka. Konstruktor kelas KafkaConsumer didefinisikan di bawah ini.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Kembalikan peta konfigurasi konsumen.

Kelas KafkaConsumer memiliki metode signifikan berikut yang didaftar di dalam tabel di bawah ini.

S.No Metode dan Deskripsi
1

public java.util.Set<TopicPar-tition> assignment()

Dapatkan sekumpulan partisi yang saat ini ditetapkan oleh konsumen.

2

public string subscription()

Berlangganan ke daftar topik yang diberikan untuk mendapatkan partisi yang ditandatangani secara dinamis.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

Berlangganan ke daftar topik yang diberikan untuk mendapatkan partisi yang ditandatangani secara dinamis.

4

public void unsubscribe()

Berhenti berlangganan topik dari daftar partisi yang diberikan.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

Berlangganan ke daftar topik yang diberikan untuk mendapatkan partisi yang ditandatangani secara dinamis. Jika daftar topik yang diberikan kosong, itu diperlakukan sama dengan unsubscribe ().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

Pola argumen mengacu pada pola berlangganan dalam format ekspresi reguler dan argumen pendengar mendapat pemberitahuan dari pola berlangganan.

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

Tetapkan daftar partisi ke pelanggan secara manual.

8

poll()

Ambil data untuk topik atau partisi yang ditentukan menggunakan salah satu API langganan / penetapan. Ini akan mengembalikan kesalahan, jika topik tidak berlangganan sebelum pengumpulan data.

9

public void commitSync()

Offset komit yang dikembalikan pada jajak pendapat terakhir () untuk semua daftar topik dan partisi yang dibuat sub-juru tulis. Operasi yang sama diterapkan ke commitAsyn ().

10

public void seek(TopicPartition partition, long offset)

Ambil nilai offset saat ini yang akan digunakan konsumen pada metode poll () berikutnya.

11

public void resume()

Lanjutkan partisi yang dijeda.

12

public void wakeup()

Bangunkan konsumen.

ConsumerRecord API

API ConsumerRecord digunakan untuk menerima catatan dari klaster Kafka. API ini terdiri dari nama topik, nomor partisi, dari mana record diterima dan offset yang menunjuk ke record di partisi Kafka. Kelas ConsumerRecord digunakan untuk membuat catatan konsumen dengan nama topik tertentu, jumlah partisi, dan pasangan <kunci, nilai>. Ini memiliki tanda tangan berikut.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - Nama topik untuk catatan konsumen yang diterima dari cluster Kafka.

  • Partition - Partisi untuk topik.

  • Key - Kunci rekaman, jika tidak ada kunci, null akan dikembalikan.

  • Value - Rekam isinya.

API ConsumerRecords

ConsumerRecords API bertindak sebagai wadah untuk ConsumerRecord. API ini digunakan untuk menyimpan daftar ConsumerRecord per partisi untuk topik tertentu. Pembuatnya didefinisikan di bawah ini.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Kembalikan peta partisi untuk topik tertentu.

  • Records - Kembali daftar ConsumerRecord.

Kelas ConsumerRecords memiliki metode berikut yang ditentukan.

S.No Metode dan Deskripsi
1

public int count()

Jumlah rekaman untuk semua topik.

2

public Set partitions()

Kumpulan partisi dengan data dalam kumpulan record ini (jika tidak ada data yang dikembalikan maka kumpulan tersebut kosong).

3

public Iterator iterator()

Iterator memungkinkan Anda untuk menggilir koleksi, mendapatkan atau memindahkan elemen.

4

public List records()

Dapatkan daftar record untuk partisi yang diberikan.

Pengaturan konfigurasi

Pengaturan konfigurasi untuk pengaturan konfigurasi utama API klien konsumen tercantum di bawah ini -

S.No Pengaturan dan Deskripsi
1

bootstrap.servers

Daftar pialang bootstrap.

2

group.id

Menetapkan konsumen individu ke grup.

3

enable.auto.commit

Aktifkan komit otomatis untuk offset jika nilainya benar, jika tidak, tidak dilakukan.

4

auto.commit.interval.ms

Kembalikan seberapa sering offset yang dikonsumsi yang diperbarui ditulis ke ZooKeeper.

5

session.timeout.ms

Menunjukkan berapa milidetik Kafka akan menunggu ZooKeeper menanggapi permintaan (baca atau tulis) sebelum menyerah dan terus menggunakan pesan.

Aplikasi SimpleConsumer

Langkah-langkah aplikasi produsen tetap sama di sini. Pertama, mulai pialang ZooKeeper dan Kafka Anda. Kemudian buat aplikasi SimpleConsumer dengan kelas java bernama SimpleCon-sumer.java dan ketikkan kode berikut.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - Aplikasi dapat dikompilasi menggunakan perintah berikut.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − Aplikasi dapat dijalankan dengan menggunakan perintah berikut

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- Buka CLI produser dan kirim beberapa pesan ke topik. Anda dapat meletakkan input smple sebagai 'Halo Konsumen'.

Output - Berikut akan menjadi outputnya.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer