Kafka Connect MongoDB Source Connector - how to set partition key?

Someone posted a similar topic a while ago, but if there’s a solution in this thread, I’m missing it. Kafka source - how to set partition key?

I am using MongoDB source connector to send document updates to a Kafka topic.
Like the poster of the above, I need to set the partition key to ensure that these updates are consumed from the Kafka topic in order. So, how do you do this? I’ve tried to leverage org.apache.kafka.connect.transforms.ValueToKey, alone an in conjunction with other transforms like Cast, ExtractField, etc, but keep getting the same error every time: “Only Struct objects supported for [copying fields from value to key], found: java.lang.String”.

Is this not possible with OOTB transforms? This seems like the most basic thing, but has me stumped.

Here’s relevant portion of my connector config: (note: keyId is a 36 char string field on the mongo doc)

"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"transforms": "copyIdToKey",
"transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields":"keyId",
"publish.full.document.only": "false",
1 Like

Well, no one offered any help, but I did manage to figure this out. Basically, you just need to set these in your connector:

“output.format.value”:“json”
“output.format.key”:“schema”
“output.schema.key”:"{“type”:“record”,“name”:“keySchema”,“fields”:[{“name”:“fullDocument.myField”,“type”:“string”}]}"

where “myField” is your field that you want to partition with.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.