Support for custom deletewritemode strategies in Kafka Sink

Hi I have a Kafka topic that I’d like to persist in a MongoDB using the official connector.

However, I struggle to get it to work with the current version. The topic we like to persist has a String-Key and a JSON payload. The id for the MongoDB is part of the JSON payload.

So far, everything is fine. However, when we receive a tombstone event, we’d like to delete all records that have the key of the topic in a certain field. Now, this causes currently 2 issues:

  1. Writing is not possible, since the connector fails on the first tombstone event and stops afterwards
  2. Delete is not possible, since there is no support for substituting the model for the delete case in a fashion similiar to writemodel.strategy

Is there any way around this and if not, would it be possible to get this PR merged? https://github.com/mongodb/mongo-kafka/pull/108

Kind regards

Thanks @Arne_Klein for your PR, we will review it within the next quarter as part of our 1.8 release. Can you provide an example ideally sample/pseudo code of how you would use this custom strategy?

Thank you @Robert_Walters for the fast response. The basic idea is, I want to save all versions of a Document until the document gets deleted. My setup would roughly look as follows:

Rough configuration of the sink:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
deletewritemodel.strategy: arneklein.connector.strategy.DeleteManyWriteStrategy

And the delete many write strategy

package arneklein.connector.strategy

import ...

class DeleteManyWriteStrategy : WriteModelStrategy {
    override fun createWriteModel(sinkDocument: SinkDocument): WriteModel<BsonDocument> {
        val keyDoc = sinkDocument.keyDoc
        val filter = Filters.eq(
            SHARED_ID_FIELD_NAME, keyDoc
        )
        return DeleteManyModel(filter)
    }

    companion object {
        private const val SHARED_ID_FIELD_NAME = "some_field"
    }
}

Please take this just as a code sample, I can try to give you some real code as well, but depending on how fast the next version gets published I might need a little longer, since I’ll have to publish my own version of the MongoDB connector then.

Great! thanks for the details. I created https://jira.mongodb.org/browse/KAFKA-320 to track the work item for this request. As far as timeline goes to set your expectations it will be at least 3-6 months out before we can address this ticket.