Maximum past data that copy.existing pull the change streams

I’m planning to use KAFKA-CONNECT to sync data between two systems. MongoDB as source with copy.existing as one of the connector configuration to sync past data.

I know change streams can pull the past data with this config. We have around 34GB of data and we have data for last year. Can change stream pull the data from the beginning ? How long the old data that change streams have?

copy.existing opens a change stream at the start marking the current time. It then copies the data via an aggregation query, then when complete starts a new change stream passing the resume token captured from the start. This was we don’t lose any events while the data is being copied.

@Robert_Walters Hi Robert. I have trying to use Mongodb Source Connector in Kafka Connect. I did try to use copy.existing. But this only pushes data which is been newly inserted. Not the existing data which is been already present.
Please let me know about the workaround for this.

Thanks,
Kunal

It sounds like your config file isn’t correct can you share your source config ?

hi @Robert_Walters ,

{
  "name": "MongoSourceConnectorConnector_0",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "connection.uri": "",
    "database": "demo",
    "collection": "identity",
    "pipeline": "[         {             '$match': {                 'operationType': {'$in': ['insert', 'update', 'replace'], }             }         },         {             '$project': {                 '_id': 1,                 'fullDocument': 1,                 'ns': 1,             }         }     ]",
    "publish.full.document.only": "true",
    "topic.namespace.map": "{\"*\":\"demo.identity\"}",
    "copy.existing": "true"
  }
}```

You can delete MongoSourceConnectorConnector_0 and recreate it. Note that If you have used this same configuration previously and just restarted the connector the resume token is stored so it won’t copy from the beginning. Also, if you set offset.partition.name with a new value this will also ensure that the old resume token does not get used.