Mongodb connector - delete document by using sink connector

Hello. I have used mongodb sink connector for writing the message into database. I integrated with Create and update that’s working fine. I wish I want to delete the document completely based on document fields. I have given below curl configuration it’s not working properly. Please, pointout what i have done wrong…
curl -X POST -H “Content-Type: application/json” -d '{“name”:“test-testing-delete”,

“config”:{“topics”:“movies”,

“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,

“tasks.max”:“1”,

“connection.uri”:“mongodb://localhost:27017”,

“database”:“flower”,

“collection”:“movies”,

“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,

“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,

“key.converter.schemas.enable”:“false”,

“value.converter.schemas.enable”:“false”,

“document.id.strategy.overwrite.existing”:true,

“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy”,

“document.id.strategy.partial.key.projection.list”:“id”,

“document.id.strategy.partial.key.projection.type”:“ALLOWLIST”,

“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy”

}}’ localhost:8083/connectors

Here is an example of this exact scenario MongoDB Connector for Apache Kafka 1.5 Available Now | MongoDB Blog

Im not sure what your source data in the topic looks like, but it looks like you are using the key project, perhaps try the value

  "document.id.strategy.partial.value.projection.type": "AllowList",
  "document.id.strategy.partial.value.projection.list": "id",

Hi. Actually , My implementation also same which you have shared with me. some times, update and delete working fine sometimes its not working as i expected. I have posted my config below, please point out what I did wrong?,
Update config,
curl -X POST -H “Content-Type: application/json” -d ‘{“name”:“test-testing-update11”,
“config”:{“topics”:“employees”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“employees”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy.overwrite.existing”:true,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.list”:“id”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy”
}}’ localhost:8083/connectors

Delete config,
curl -X POST -H “Content-Type: application/json” -d ‘{“name”:“test-testing-delete36”,
“config”:{“topics”:“subjects”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“subjects”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.list”:“class”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy”
}}’ localhost:8083/connectors

If it is sometimes working, my guess is the data that is on the kafka topic doesn’t include the key value pair that contains the class field.

data will be like this {“id”:“ESC001”,“class”:“2”}
In distributed.properties file except plugin.path what should I include?

Could you please give the implementation link for update?. because, whenever I am giving curl command its inserting onemore record with new object id and _id field. I don’t know what I am doing wrong,
Update Curl command is below,

curl -X POST -H “Content-Type: application/json” -d ‘{“name”:“test-testing-update1”,
“config”:{“topics”:“products”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“quickstart”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“document.id.strategy.partial.value.projection.list”:“p_id”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy”,
“document.id.strategy.overwrite.existing”:true
}}’ localhost:8083/connectors

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.