I’m trying to read messages from our Kafka cluster into my local MongoDB database.
For testing I’m using the MongoDB Kafka Sink connector 1.8.0. I’m currently running macOS 12.6, but the eventual target environment will be some linux.
I’m able to dump the Kafka messages content, however, when I’m trying to utilize the message key as the _id
I’m running into issues:
My test configuration is:
name=mongodb-local-sink-4
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=mongodb://localhost/?replicaSet=rs0
tasks.max=1
topics=some_topic
database=sink_test
collection=sink_test
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
#document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInKeyStrategy
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
My error message is:
[2022-10-17 12:01:16,001] ERROR [mongodb-local-sink|task-0] WorkerSinkTask{id=mongodb-local-sink-4-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
[...]
Caused by: org.apache.kafka.connect.errors.DataException: org.apache.kafka.connect.errors.DataException: Could not convert key `00a7e296-a4b5-4404-836d-b15fc54122a7` into a BsonDocument.
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.handleTolerableWriteException(StartedMongoSinkTask.java:228)
[...]
Caused by: org.apache.kafka.connect.errors.DataException: Could not convert key `00a7e296-a4b5-4404-836d-b15fc54122a7` into a BsonDocument.
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:161)
[...]
Caused by: org.bson.json.JsonParseException: Invalid JSON number
at org.bson.json.JsonScanner.scanNumber(JsonScanner.java:444)
For some reason the connector attempts to convert our message key 00a7e296-a4b5-4404-836d-b15fc54122a7
into a number and fails as there are some non-numeric characters. For my processing I’ll need the full key.
Checking the documentation, I’m not sure on how I can tell the connector to directly use the external key instead of trying to generate an ObjectID.