Kafka Source Connector: Copy Existing restarts from scratch if interrupted

Hello,

We are using the Kafka source connector and we have noticed an unexpected issue recently.

We set the copy.existing option to true to perform an initial load of the whole MongoDB collection before switching to the change stream. This works fine most of the time. But recently we had an issue causing our connectors to restart frequently (1-2 times per hour). We noticed that after the restart, if the copy was not finished yet, it started copying everything again. In the end our Kafka topic was filled with hundreds of copies of the same data while the copy process never ended.

While we can try to avoid restarts as much as possible, it would be great if instead the copy would restart from where it left off. For huge and long initial loads, a single restart can have a big impact.

Do you know if there is anything we can do to prevent that problem? Is that supposed to happen? Is there a plan to improve that behaviour?

Thanks for your help

Hi Colin, This is by design as the connector has no idea where within the copy process it currently is. Thus, the connector itself can’t really resume upon failure. Is your scenario to just replicate MongoDB data between clusters? How large are these collections that are causing it to take a long time to do the initial copy?

1 Like

Hi Robert,

Thanks for your answer.

Is your scenario to just replicate MongoDB data between clusters?

We do not really need to replicate the data to another MongoDB cluster. We need to ingest the data on Kafka to be used by other tools on our side that work specifically with Kafka. So we can’t bypass Kafka if that’s what you were suggesting.

How large are these collections that are causing it to take a long time to do the initial copy?

Around 50M records and several hundred GBs of data. We could have more in the future.

This is by design as the connector has no idea where within the copy process it currently is. Thus, the connector itself can’t really resume upon failure.

Would it make sense to consider handling this failure scenario? I mean, is it an explicit design choice to not handle it, or is it simply too complex / impossible to handle it? Would it be hard to keep track of where the copy process currently is?

Note that my knowledge of MongoDB is pretty limited so I’m only trying to understand the limitations here so that we can evaluate our options on our side. I understand of course that is probably not a trivial problem.

In terms of supporting failures for copy.existing, as the code is today, it would be hard to keep track of where the copy process currently is. Perhaps you could watch for a failed connector (in curl terms…

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort

and restart the process - all outside the connector. Perhaps through an automation tool.

Feel free to file a JIRA ticket on this feature request.

Thanks for the suggestions. I have opened a JIRA ticket: KAFKA-315, and we will look for other workarounds in the meantime.