I want to replicate a collection and sync in real time.
The CDC events are streamed to Kafka and I’ll be listening to it and based on operationType I’ll have to process the document and load it in delta table. I have all the columns possible in my table in case of schema change in fullDocument.
I am working with PySpark in Databricks. I have tried couple of different approaches -
- using forEachBatch,
clusterTimefor ordering but this requires me to do acollectand the process event, this was too slow - Using SCD kind of approach where Instead of deleting any record I was marking them inactive -
This does not give you a proper history tracking because for a_idI am taking the latest change and processing it. What issue I am facing with this is - I have been told by the source team that I can get an insert event for a_idafter a delete event of the same_idso if in my batch for a_idthere are events - “update → delete, → insert” then based on latest change I’ll pick the insert and this will cause a duplicate record in my table.
What will be the best way to handle this?