Hi I have a Kafka topic that I’d like to persist in a MongoDB using the official connector.
However, I struggle to get it to work with the current version. The topic we like to persist has a String-Key and a JSON payload. The id for the MongoDB is part of the JSON payload.
So far, everything is fine. However, when we receive a tombstone event, we’d like to delete all records that have the key of the topic in a certain field. Now, this causes currently 2 issues:
- Writing is not possible, since the connector fails on the first tombstone event and stops afterwards
- Delete is not possible, since there is no support for substituting the model for the delete case in a fashion similiar to writemodel.strategy
Is there any way around this and if not, would it be possible to get this PR merged? Support setting a custom deletewritemode.strategy by ArneKlein · Pull Request #108 · mongodb/mongo-kafka · GitHub
Kind regards
Thanks @Arne_Klein for your PR, we will review it within the next quarter as part of our 1.8 release. Can you provide an example ideally sample/pseudo code of how you would use this custom strategy?
Thank you @Robert_Walters for the fast response. The basic idea is, I want to save all versions of a Document until the document gets deleted. My setup would roughly look as follows:
Rough configuration of the sink:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
deletewritemodel.strategy: arneklein.connector.strategy.DeleteManyWriteStrategy
And the delete many write strategy
package arneklein.connector.strategy
import ...
class DeleteManyWriteStrategy : WriteModelStrategy {
override fun createWriteModel(sinkDocument: SinkDocument): WriteModel<BsonDocument> {
val keyDoc = sinkDocument.keyDoc
val filter = Filters.eq(
SHARED_ID_FIELD_NAME, keyDoc
)
return DeleteManyModel(filter)
}
companion object {
private const val SHARED_ID_FIELD_NAME = "some_field"
}
}
Please take this just as a code sample, I can try to give you some real code as well, but depending on how fast the next version gets published I might need a little longer, since I’ll have to publish my own version of the MongoDB connector then.
Great! thanks for the details. I created https://jira.mongodb.org/browse/KAFKA-320 to track the work item for this request. As far as timeline goes to set your expectations it will be at least 3-6 months out before we can address this ticket.