How to get Kafka event KEY (string) as the document _id

I’m new to mongo DB and trying to use the Sink connector to populate a collection. My goal is to use the KEY of the event as the _id for the document. The goal will be to UPDATE a document when an event with the same KEY is received and DELETE the document when an event with an empty value (a.k.a. tombstone event) is received.

I am stuck trying to use the Key of the event which is a string as the unique _id of the document.

KEY (string)
“ABC-123456-XYZ”

Value
Some AVRO event

When I use
“document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy”,

I get the the following error:
Could not convert key ABC-123456-XYZ into a BsonDocument.

There are similar questions where the KEY is used as part of the document (see: Kafka sink connector : How to get Kafka message key into document), but I cannot find information on using the String KEY.

Thanks in advance.

The answer was there I just had to keep on looking. Here are the connector settings that worked for me.

{
    "name": "MongoAccountSinkConnector",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "name": "MongoAccountSinkConnector",
        "topics": "ACCOUNT",
        "connection.uri": "mongodb://...",
        "database": "local",
        "collection": "Account",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://...:28081",
        "mongodb.delete.on.null.values": "true",
        "delete.on.null.values": "true",
        "document.id.strategy.overwrite.existing": "true",
        "document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
        "transforms":"hk",
        "transforms.hk.type":"org.apache.kafka.connect.transforms.HoistField$Key",
        "transforms.hk.field":"_id",
        "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy"
    }
}

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.