Someone posted a similar topic a while ago, but if there’s a solution in this thread, I’m missing it. Kafka source - how to set partition key?
I am using MongoDB source connector to send document updates to a Kafka topic.
Like the poster of the above, I need to set the partition key to ensure that these updates are consumed from the Kafka topic in order. So, how do you do this? I’ve tried to leverage org.apache.kafka.connect.transforms.ValueToKey, alone an in conjunction with other transforms like Cast, ExtractField, etc, but keep getting the same error every time: “Only Struct objects supported for [copying fields from value to key], found: java.lang.String”.
Is this not possible with OOTB transforms? This seems like the most basic thing, but has me stumped.
Here’s relevant portion of my connector config: (note: keyId is a 36 char string field on the mongo doc)
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"transforms": "copyIdToKey",
"transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields":"keyId",
"publish.full.document.only": "false",