Kafka source - how to set partition key?

I’m using the Kafka source connector to build a change data capture pipeline from our legacy applications that store data in Mongo.

It is important to us that our sinks receive changes to a Mongo document in the right order. (e.g. insert must be first, followed by any updates, followed possibly by a delete).

I believe the answer is to partition our topic based on documentId. That way a fast sink cannot process the a document delete before a slower sink has processed the insert.

But how do we configure the Kafka Source Connector to use document Id (or any other field) as the partition key?

Hi,

So you are right that, in general, your change streams documents will only be ordered within one an the same kafka topic partition. For the kafka source connector scenario this means that the key of the kafka connect source record must be properly constructed.

My bad I got this wrong myself on the first answering attempt because I was actually working and reading different things in parallel :see_no_evil: which I shouldn’t do :grimacing:

Addendum: The _id field however will per default contain the resume token NOT the original document’s _id though. This means you have to explicitly change the key on the way into kafka. For this you could try to apply a proper SMT configuration with any of the existing SMTs and if this doesn’t work - depending on your other config settings of the connector - you could implement a custom SMT which allows you to change the records’ keys to the _id field you need.

Also you can simply verify what the actual key is given your specific configuration by inspecting the kafka records from the target topic directly, e.g. by means of the kafka (avro) console consumer which allows you to also print the key of each records together with the value. Then you should explicitly see what data is contained in your record’s key and if it contains what it should/you expect or not.

Hope this helps!

1 Like

Sorry for reviving this old topic, but i think this is relevant. I actually did create a custom SMT that suits my needs. We run the connector in docker hosted by kubernetes. On my local environment it all works just fine, but as soon as i try to deploy it it fails with:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 
error(s):\nInvalid value com.az.ip.dmdp.commons.kafka.connect.DocumentKeyIdAsRecordKey$Value for 
configuration transforms.createKey.type: Class 
com.az.ip.dmdp.commons.kafka.connect.DocumentKeyIdAsRecordKey$Value could not be found.\nInvalid 
value null for configuration transforms.createKey.type: Not a Transformation\nYou can also find the above 
list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

So it cant find the class. I can see that it loads the plugin though. I am really desperate here, but any help on HOW to get this working is much appreciated. I have placed my custom SMT under

/usr/share/java/mongodb-kafka-connect-mongodb/lib/ folder where the connector is. The plugin.path prop is set to /usr/share/java which is the parent of all plugins in my docker container

It might be related to kafka-connect and not you, and if so i apologise for bothering you.

One thing that went wrong for me is I was projecting only some fields, but I set my partition key to a field that was not in my projected fields.

You might want to look at the server error logs: I have found them more useful than the HTTP responses.

You can change the log level by doing an http post to /admin/loggers/[logName] with the level you want.

I acctually found the solution. It was my mistake. I didnt take distribution in to account. We have rolling deploys, so the old container and the new one joined the same cluster. The old one was running so it became master and got the new config update, and therefore threw the error message :slight_smile: Now all is well

1 Like