Kafka sink connector : How to get Kafka message key into document

Hi,

how to make use of the kafka message key? Example:

kafka message key:

"foo"

kafka message value:

"bar"

desired document in mongodb:

{ 
  "id": "foo",
  "payload": "bar"
}

I can use HoistField transformation to wrap these strings in 2 seperate json, but how to merge them to get the desired doc?

Many Thanks!

Maybe two SMTs? I bit of a guess here but something like this?

“transforms”: “HoistKey,HoistField”,
“transforms.HoistField.type”: “org.apache.kafka.connect.transforms.HoistField$Value”,
“transforms.HoistField.field”: “id”,
“transforms.HoistKey.type”: “org.apache.kafka.connect.transforms.HoistField$Key”,
“transforms.HoistKey.field”: “payload”,

Hey there @Tin_Stribor_Sohn!

Important to know is the following:

  1. if you have plain strings in your kafka records for K+V you use the StringConverter in the first place
  2. Like you mentioned you then apply the HoistField SMT on both the K+V which will give you JSON for both parts of the kafka record in Connect
  3. the mongodb sink connector would primarily focus on the value of the record and in addition you provide in the config how it should create the “_id” field based on a DocumentIdStrategy. In your case you want the “_id” field 1:1 as it is given by the result of the SMT for the key which is why you specify the ProvidedInKeyStrategy. In other words what you call “merge” of the K+V is done by applying the correct id gen strategy in this case.

That should do it! Look at the following config snippet that should give you the desired outcome. Adapt it to your needs:

 {
    "name": "demo-sink-keyvalue",
    "config": {
        "topics": "keyvaluedemo",
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "tasks.max": "1",
        "connection.uri":"mongodb://mongodb:27017",
        "database":"forum_qa",
        "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
        "transforms":"hk,hv",
        "transforms.hk.type":"org.apache.kafka.connect.transforms.HoistField$Key",
        "transforms.hk.field":"_id",
        "transforms.hv.type":"org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.hv.field":"payload"
    }
}
3 Likes