KafkaJSでの指導者選挙を待っています

Aug 25 2020

状況

kafkajsを使用して、動的に生成されたkafkaトピックに書き込みます。

プロデューサーを登録した直後にこれらのトピックに書き込むと、定期的にエラーが発生することがわかりましたThere is no leader for this topic-partition as we are in the middle of a leadership election

完全なエラーは次のとおりです。

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

コード

問題の原因となっているコードは次のとおりです。

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

質問

2つの質問:

  1. プロデューサーが本当にkafkaトピックにデータを送信する準備ができるまでロジックがブロックされるようにするために、プロデューサーに接続(または送信)するときに特別なことをする必要がありますか?
  2. メッセージがドロップされないようにするために、プロデューサーにデータを送信するときに特別なことはありますか?

回答

slifty Aug 25 2020 at 01:23

ソリューション

Kafkajsは、オプションのフラグを持つ管理クライアントをcreateTopics介してメソッドを提供します。waitForLeaders

admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

これを使用すると、問題が解決します。

import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

残念ながら、トピックがすでに存在する場合、これは別のエラーになりますが、それは別の質問であり、エラーは壊すよりも情報量が多いと思います。

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}