Key conversion Error with Kafka Sink connector

I’m trying to read messages from our Kafka cluster into my local MongoDB database.
For testing I’m using the MongoDB Kafka Sink connector 1.8.0. I’m currently running macOS 12.6, but the eventual target environment will be some linux.

I’m able to dump the Kafka messages content, however, when I’m trying to utilize the message key as the _id I’m running into issues:

My test configuration is:

name=mongodb-local-sink-4
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=mongodb://localhost/?replicaSet=rs0
tasks.max=1
topics=some_topic
database=sink_test
collection=sink_test
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
#document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInKeyStrategy
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter

My error message is:

[2022-10-17 12:01:16,001] ERROR [mongodb-local-sink|task-0] WorkerSinkTask{id=mongodb-local-sink-4-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
[...]
Caused by: org.apache.kafka.connect.errors.DataException: org.apache.kafka.connect.errors.DataException: Could not convert key `00a7e296-a4b5-4404-836d-b15fc54122a7` into a BsonDocument.
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.handleTolerableWriteException(StartedMongoSinkTask.java:228)
[...]
Caused by: org.apache.kafka.connect.errors.DataException: Could not convert key `00a7e296-a4b5-4404-836d-b15fc54122a7` into a BsonDocument.
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:161)
[...]
Caused by: org.bson.json.JsonParseException: Invalid JSON number
	at org.bson.json.JsonScanner.scanNumber(JsonScanner.java:444)

For some reason the connector attempts to convert our message key 00a7e296-a4b5-4404-836d-b15fc54122a7 into a number and fails as there are some non-numeric characters. For my processing I’ll need the full key.

Checking the documentation, I’m not sure on how I can tell the connector to directly use the external key instead of trying to generate an ObjectID.

I just started with the sink connector and came here for this exact reason. The root cause seems to be that there is an implicit assumption throughout the Sink connector code that the Kafka key, if any, is parseable as a Document. It looks like you are using scalar strings as keys, which is what we are doing also. If you look in StringRecordConverter you’ll see that these are parsed as BsonDocuments, not BsonStrings.

As someone that has a lot of experience with Kafka and stream processing, I can say for sure that using complex types of any sort as keys in Kafka is a Bad Idea. The fact that the sink connector seems to require this is even worse, since it seems likely to encourage people to do a bad thing. The reason you don’t want to do this is because the semantics of these complex types don’t match Kafka’s own semantics for determining key equality. Things like the order of keys in an object, pretty-printing/extra whitespace, etc that are insignificant to JSON are very significant to Kafka and can cause issues with partitioning, log compaction, etc.

We are trying to figure out how to work around this now but at this moment this behavior feels like a deal breaker if you’re unable or unwilling to use complex keys.

1 Like

FYI I found this post which details using simple connect transforms to force the key into a document shape before processing by the sink connector, which seems to work: Kafka sink connector : How to get Kafka message key into document - #3 by hpgrahsl

2 Likes

Thanks, with some minor modifications that worked for me. I wanted to keep the other fields.

name=mongodb-local-sink-4
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=mongodb://localhost/?replicaSet=rs0
tasks.max=1
topics=some_topic
database=sink_test
collection=sink_test
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
transforms=hk
transforms.hk.type=org.apache.kafka.connect.transforms.HoistField$Key
transforms.hk.field=_id
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
2 Likes

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.