MongoDB sink connector conditional update

For the sink connector - if it determines that a record should be updated, is it possible to have the connector only update the record if a field in the kafka message is greater than an existing field in mongo? (i.e. compare timestamps) Else, it would ignore and not update that record in mongo.

Thanks.

Hi @Dejan_Katanic,

Thats a good question, you’d have to write your own custom write model strategy. Returning a null value indicates a no-op, theres an example in the documentation that should help get you started.

All the best,

Ross

1 Like

Hi @Dejan_Katanic,

As @Ross_Lawley mentioned you can of course come up with any custom implementation for write model strategies that you might need for a specific use case. However, if I got you right, it might be your lucky day because somebody else - in this case my humble self - has written something for you. Either it fits as is for what you want to achieve or you can take it as a starting point and modify it to your needs.

My original sink connector code contains a wm strategy called MonotonicWritesDefaultStrategy which makes use of a conditional update pipeline. Read about the feature here: GitHub - hpgrahsl/kafka-connect-mongodb: **Unofficial / Community** Kafka Connect MongoDB Sink Connect

The code for it can be found here: kafka-connect-mongodb/MonotonicWritesDefaultStrategy.java at master · hpgrahsl/kafka-connect-mongodb · GitHub It’s probably not the most beautiful implementation but hey, it did the job for me and others quite well back in 2019 already :slight_smile:

NOTE: It needs MongoDB version 4.2+ and Java Driver 3.11+ since lower versions of either lack the support for leveraging update pipeline syntax which is needed to perform the conditional checks during write operations.

Let me know if it’s helpful for you!

1 Like