Menenggelamkan data topik dari produsen Java ke Mongodb

Aug 15 2020

Saya menghasilkan data dengan java dan memasukkannya ke topik Kafka setelah itu saya ingin data ini dimasukkan ke MongoDB. Ketika saya mengirim data sebagai JSON melalui JAVA, itu tidak akan disimpan ke MongoDB karena kesalahan ini.

[2020-08-15 18:42:19,164] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JSON reader was expecting a value but found 'siqdj'. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
        at java.util.HashMap.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
[2020-08-15 18:42:19,166] ERROR WorkerSinkTask{id=Kafka_ops-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
        at java.util.HashMap.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        ... 10 more

Berikut adalah data yang saya kirim melalui program java saya di konsumen Kafka.

{"name":"This is a test","dept":"siqdj","studentId":1}
{"name":"This is another","dept":"siqdj","studentId":2}

Setiap baris merepresentasikan sebuah record

Ini adalah file konfigurasi saya

connect-standalone.properties

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/plugins

MongoSinkConnector.properties

name=Kafka_ops
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=TestTopic4
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=student_kafka
collection=students
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Jawaban

NARASIMHAMURTHY Aug 21 2020 at 18:14

Tariq - Saya bukan ahli dalam topik ini. Tetapi saya telah mencoba hal serupa dengan adaptor wastafel JDBC dengan database Oracle. Format data yang Anda kirim ke topik tampaknya tidak sesuai bagi saya. Karenanya, Anda mungkin mendapatkan kesalahan. Karena Anda menggunakan JsonConverter, setiap baris dalam topik harus dalam format berikut agar adaptor sink dapat mengurai dan menulis ke penyimpanan data. Saat ini data Anda tidak memiliki skema dalam payload. Karena itu kesalahannya. Harap teruskan di bawah ini ke topik dan lihat apakah itu tenggelam ke MongoDB.

{"schema": {"type": "struct", "fields": [{"type": "string", "opsional": false, "field": "name"}, {"type": "string "," opsional ": true," field ":" dept "}, {" type ":" int64 "," opsional ": true," field ":" studentId "}]," opsional ": false," nama ":" YOUR_TABLE_NAME "}," payload ": {" name ":" This is a test "," dept ":" siqdj "," studentId ": 1}}