Hi everyone,
I am working in a project where I publish data to a kafka topic(in AVRO format), and I want to use a Mongo-Sink Connector in order to UPSERT documents and DELETE a document when I publish a null
value to the topic.
The topic will have as a key
the values that I want to use in order to find and update a document, but also the values that I can use to find and delete a document, in case I am publishing null
value to the topic.
The topic has the following format:
KAFKA_TOPIC ->
**key**: { "value_1":"xyz", "value_2":100, "value_3":1000 }
**value**: {
"value_1": "xyz",
"value_2": 100,
"value_3": 1000,
"value_4": 3000,
"value_5": true,
}
When I want to delete values, then I send the following in my kafka topic:
KAFKA_TOPIC ->
**key**: { "value_1":"xyz", "value_2":100, "value_3":1000 }
**value**: null
UPSERT operation works completely fine, but DELETE it does not work at all.
I am trying to understand what I am doing wrong.
You can see next the configuration I have for my mongo-sink-connector:
"name": "mongodb-sink-test",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":1,
"topics":"test-sink-topic",
"connection.uri":"mongodb://localhost:27017/retryWrites=true&w=majority",
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
"document.id.strategy.partial.key.projection.list":"vid,firstOccurrence,source",
"document.id.strategy.partial.key.projection.type":"ALLOWLIST",
"delete.on.null.values":true,
"max.batch.size":"100",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"database":"database-test",
"collection":"collection-test",
"document.id.strategy.overwrite.existing":true,
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://schema-registry:8085",
"key.converter.schemas.enable":true,
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable":true,
"value.converter.schema.registry.url":"http://schema-registry:8085",
"errors.log.enable":true,
"errors.log.include.messages":true,
"errors.retry.delay.max.ms":"60000",
"errors.retry.timeout":"0",
"errors.tolerance":"all"
}
}
Any ideas of what I am doing wrong ?
Appreciate your help !