Kafka Source Connector - Recovering from down time

I’m struggling with planning my data bootstrap because I may lose some data. I was thinking make a two step bootstrap, something like this:

1- Load existing mongo data at D-1 (D = Go Live Date, in this case, D-1 means the exact day before go live date) using a connector with copy.existing = true
2 - After all data is loaded, I drop the connector.
3 - At D-0 I want start collection live data, so i create a new connector without copy.existing property
- It’s assumed that between D-1 and D-0 users worked on application normally.

Using this strategy, my expectation was that i could able to collect the between before i dropped the first connector and the one created a day later. Unfortunately that wasn’t happened.

So, i tried a new test.

1 - Created a connector without copy.existing.
2 - Made some changes to db and validated successfully that data flowed to Kafka.
3 - Then i shutdown my KSQLDB Server (i use the KSQLDB embedded Kafka Connect)
4 - Made some changes in some db collections
5 - Started up again KSQLDB Server
6 - After connector was working again, i noticed that the data from step 4 wasnt recoved (data changes after restart was ok)

So, my question is. Do Kafka Source Connector manages downtimes or is some property in my connector config that i’m missing?

CREATE SOURCE CONNECTOR live-bckp-mongo-sk-campaigns WITH (
“connector.class” = ‘com.mongodb.kafka.connect.MongoSourceConnector’,
“connection.uri” = ‘{uri}’,
“database” = ‘{db}’,
“collection” = ‘{collection}’,
“topic.prefix” = ‘{myprefix}’,
“change.stream.full.document” = ‘updateLookup’,
“output.format.value” = ‘schema’,
“output.schema.value”= ‘{myschema}’
);

Why drop the connector itself? Why not just stop it? The Source connector uses MongoDB Change streams under the covers. When you start the connector for the first time it will capture the _id of the event (the resume token) and store this in one of two different locations depending on the topology of the Kafka Connect environment:

  • Standalone: the Resume Token is stored in a file specified by the property offset.storage.file.filename.
  • Distributed: the Resume Token is stored in a topic specified by offset.storage.topic

As events come in on the source the latest resume token is kept up to date in these locations.

Now if you stop the connector and time goes by upon restarting the connector, it will read the last resume token and read events from this location all the way to the current event and effectively catch up to the current events.

When you delete the connector, you are deleting the resume token so when you recreate the connector and start it , it doesn’t know the past.

Do we have any documentation around how to deploy MongoDb Connector in distributed mode in kubernetes?