How to convert a String field to ObjectId in MongoSinkConnector

Hi,

I’m trying to sink a Kafka topic using com.mongodb.kafka.connect.MongoSinkConnector and the messages follow this avro definition:

record MyObjectType {
    string id;
    string userId;
}

What I want to do is to make the sink connector treat the userId attribute as the _id (of type ObjectId) in the upserted MongoDB document.

i.e. given avro message:

{
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

I want the below MongoDB document

{
    "_id" : ObjectId("60a50547e578c87ac72f1041")
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

I have made various attempts, e.g. to use SMTs to alter the incoming avro message to be similar to extended json format { _id : {$oid: "60a50547e578c87ac72f1041"} }

The closest I’ve got is using the below connector configuration

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://local-kafka-schema-registry-cp-schema-registry:8081",
"value.converter.schemas.enable": true,
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms":"createKey,RenameField_userId,HoistField_id",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"userId",
"transforms.RenameField_userId.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameField_userId.renames": "userId:oid",
"transforms.HoistField_id.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.HoistField_id.field": "_id"

Then I get this document inserted into MongoDB

{
    "_id" : {
        "oid" : "60a50547e578c87ac72f1041"
    },
    "id" : "60a50547e578c87ac72f1042",
    "userId" : "60a50547e578c87ac72f1041"
}

Some notes:

  • Ideally I would like also id and userId be converted to ObjectId in the target collection
  • I can change the structure of the Avro if needed, but I cannot change the fact that the resulting _id must be an ObjectId.
  • The only document.id.strategy I’ve found that works with ObjectId is the BsonOidStrategy, but that one is generating a new random ObjectId, not based on a value in the message.

Any suggestions to accomplish this would be greatly appreciated!

You need to use DocumentIdAddr post processor. Looks like you are on the right track above. Not sure you need all those transforms to make it happen.