MongoDd Kafka connect error: org.apache.kafka.connect.errors.ConnectException: Unexpected error: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null

Hi,

I am getting following error while running mongodb-kafka source connector:

(com.mongodb.kafka.connect.source.MongoSourceTask:458)
[2023-07-27 18:24:25,165] ERROR [mongodb-source-connector|task-0] WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Unexpected error: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:597)
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.pollInternal(StartedMongoSourceTask.java:211)
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.poll(StartedMongoSourceTask.java:188)
	at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:173)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:462)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:351)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:857)
Caused by: java.lang.NullPointerException: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:579)
	... 14 more

Here is my Source connector configuration:

name=mongodb-source-connector
connector.class=com.mongodb.kafka.connect.MongoSourceConnector

tasks.max=1
connection.uri=mongodb://localhost:27017
database=productDb
collection=products

key.converter=org.apache.kafka.connect.storage.StringConverter
key.field=_id

value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
topic=output-topic

poll.max.batch.size=1000
poll.await.time.ms=500

initial.sync.source=true

MongoDb Version:

db version v6.0.6
Build Info: {
    "version": "6.0.6",
    "gitVersion": "26b4851a412cc8b9b4a18cdb6cd0f9f642e06aa7",
    "modules": [],
    "allocator": "system",
    "environment": {
        "distarch": "x86_64",
        "target_arch": "x86_64"
    }
}

mongodb-kafka connector Jar: mongo-kafka-connect-1.10.1-confluent.jar

kafka connector executing as: connect-standalone.properties

We have been hitting the same error with a DocumentDB connector.

We tried changing the offset name by setting the offset.partition.name property, but this did not help. It would run if we set errors.tolerance: all; however, we did not want to leave this property enabled long-term, and the connector broke with the same error as soon as we removed it. Both of these suggestions were sourced from here: https://www.mongodb.com/docs/kafka-connector/current/troubleshooting/recover-from-invalid-resume-token/#invalid-resume-token

We’re hoping for a more elegant solution, but we did find that just redeploying the connector with an entirely new name worked (or at least has so far).

Hi @link2anjan_N_A,

Are there any log messages regarding the cursor / mongodb that occur before that error? I want to understand if the connector was mid shutdown or if it was during the general running.

@Mike_Ray - there is the mongo.errors.tolerance setting for just the connector.

I’ve added KAFKA-383 to track.

Ross

1 Like

Please find details:

[2023-07-27 18:44:38,943] INFO [mongo-source|task-0] These configurations '[metrics.context.connect.kafka.cluster.id]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig:378)
[2023-07-27 18:44:38,944] INFO [mongo-source|task-0] Kafka version: 3.5.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-07-27 18:44:38,944] INFO [mongo-source|task-0] Kafka commitId: unknown (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-07-27 18:44:38,945] INFO [mongo-source|task-0] Kafka startTimeMs: 1690463678944 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-07-27 18:44:38,960] INFO [mongo-source|task-0] [Producer clientId=connector-producer-mongo-source-0] Cluster ID: tzhR2bbzT76vdhT3DONr9A (org.apache.kafka.clients.Metadata:287)
[2023-07-27 18:44:38,961] INFO [mongo-source|task-0] Starting MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:109)
[2023-07-27 18:44:38,962] INFO Created connector mongo-source (org.apache.kafka.connect.cli.ConnectStandalone:76)
[2023-07-27 18:44:38,998] INFO [mongo-source|task-0] MongoClient with metadata {"driver": {"name": "mongo-java-driver|sync|mongo-kafka|source", "version": "4.7.2|1.10.1"}, "os": {"type": "Darwin", "name": "Mac OS X", "architecture": "x86_64", "version": "12.6.7"}, "platform": "Java/IBM Corporation/17.0.8+5"} created with settings MongoClientSettings{readPreference=primary, writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=null, streamFactoryFactory=null, commandListeners=[com.mongodb.kafka.connect.source.MongoSourceTask$1@54efa557], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@d4e9fa70]}, clusterSettings={hosts=[localhost:27017], srvServiceName=mongodb, mode=SINGLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=false, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=UNSPECIFIED, serverApi=null, autoEncryptionSettings=null, contextProvider=null} (org.mongodb.driver.client:71)
[2023-07-27 18:44:39,004] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:3, serverValue:184}] to localhost:27017 (org.mongodb.driver.connection:71)
[2023-07-27 18:44:39,005] INFO [mongo-source|task-0] Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=17, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=7190556} (org.mongodb.driver.cluster:71)
[2023-07-27 18:44:39,006] INFO [mongo-source|task-0] Watching for collection changes on 'productDb.products' (com.mongodb.kafka.connect.source.MongoSourceTask:637)
[2023-07-27 18:44:39,005] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:4, serverValue:185}] to localhost:27017 (org.mongodb.driver.connection:71)
[2023-07-27 18:44:39,061] INFO [mongo-source|task-0] New change stream cursor created without offset. (com.mongodb.kafka.connect.source.MongoSourceTask:417)
[2023-07-27 18:44:39,093] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:5, serverValue:186}] to localhost:27017 (org.mongodb.driver.connection:71)
[2023-07-27 18:44:39,114] WARN [mongo-source|task-0] Failed to resume change stream: The $changeStream stage is only supported on replica sets 40573

=====================================================================================
If the resume token is no longer available then there is the potential for data loss.
Saved resume tokens are managed by Kafka and stored with the offset data.

To restart the change stream with no resume token either: 
  * Create a new partition name using the `offset.partition.name` configuration.
  * Set `errors.tolerance=all` and ignore the erroring resume token. 
  * Manually remove the old offset from its configured storage.

Resetting the offset will allow for the connector to be resume from the latest resume
token. Using `startup.mode = copy_existing` ensures that all data will be outputted by the
connector but it will duplicate existing data.
=====================================================================================
 (com.mongodb.kafka.connect.source.MongoSourceTask:458)
[2023-07-27 18:44:39,116] INFO [mongo-source|task-0] Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:163)
[2023-07-27 18:44:39,117] INFO [mongo-source|task-0] WorkerSourceTask{id=mongo-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)
[2023-07-27 18:44:39,122] INFO [mongo-source|task-0] Watching for collection changes on 'productDb.products' (com.mongodb.kafka.connect.source.MongoSourceTask:637)
[2023-07-27 18:44:39,127] INFO [mongo-source|task-0] New change stream cursor created without offset. (com.mongodb.kafka.connect.source.MongoSourceTask:417)
[2023-07-27 18:44:39,132] WARN [mongo-source|task-0] Failed to resume change stream: The $changeStream stage is only supported on replica sets 40573

=====================================================================================
If the resume token is no longer available then there is the potential for data loss.
Saved resume tokens are managed by Kafka and stored with the offset data.

To restart the change stream with no resume token either: 
  * Create a new partition name using the `offset.partition.name` configuration.
  * Set `errors.tolerance=all` and ignore the erroring resume token. 
  * Manually remove the old offset from its configured storage.

Resetting the offset will allow for the connector to be resume from the latest resume
token. Using `startup.mode = copy_existing` ensures that all data will be outputted by the
connector but it will duplicate existing data.
=====================================================================================
 (com.mongodb.kafka.connect.source.MongoSourceTask:458)
[2023-07-27 18:44:39,135] ERROR [mongo-source|task-0] WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Unexpected error: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:597)
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.pollInternal(StartedMongoSourceTask.java:211)
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.poll(StartedMongoSourceTask.java:188)
	at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:173)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:462)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:351)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:857)
Caused by: java.lang.NullPointerException: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:579)
	... 14 more
[2023-07-27 18:44:39,139] INFO [mongo-source|task-0] Stopping MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:178)
[2023-07-27 18:44:39,139] INFO [mongo-source|task-0] Stopping MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:337)
[2023-07-27 18:44:39,151] INFO [mongo-source|task-0] [Producer clientId=connector-producer-mongo-source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1310)
[2023-07-27 18:44:39,159] INFO [mongo-source|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)

please find log details

[2023-07-28 15:36:08,635] INFO [mongodb-source-connector|task-0] These configurations '[metrics.context.connect.kafka.cluster.id]' were supplied but are not used yet. (org.apache.kafka.clients.producer.ProducerConfig:378)
[2023-07-28 15:36:08,636] INFO [mongodb-source-connector|task-0] Kafka version: 3.5.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2023-07-28 15:36:08,636] INFO [mongodb-source-connector|task-0] Kafka commitId: unknown (org.apache.kafka.common.utils.AppInfoParser:120)
[2023-07-28 15:36:08,636] INFO [mongodb-source-connector|task-0] Kafka startTimeMs: 1690538768635 (org.apache.kafka.common.utils.AppInfoParser:121)
[2023-07-28 15:36:08,656] INFO [mongodb-source-connector|task-0] [Producer clientId=connector-producer-mongodb-source-connector-0] Cluster ID: tzhR2bbzT76vdhT3DONr9A (org.apache.kafka.clients.Metadata:287)
[2023-07-28 15:36:08,657] INFO [mongodb-source-connector|task-0] Starting MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:109)
[2023-07-28 15:36:08,664] INFO Created connector mongodb-source-connector (org.apache.kafka.connect.cli.ConnectStandalone:76)
[2023-07-28 15:36:08,702] INFO [mongodb-source-connector|task-0] MongoClient with metadata {"driver": {"name": "mongo-java-driver|sync|mongo-kafka|source", "version": "4.7.2|1.10.1"}, "os": {"type": "Darwin", "name": "Mac OS X", "architecture": "x86_64", "version": "12.6.7"}, "platform": "Java/IBM Corporation/17.0.8+5"} created with settings MongoClientSettings{readPreference=primary, writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=null, streamFactoryFactory=null, commandListeners=[com.mongodb.kafka.connect.source.MongoSourceTask$1@8d88f514], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@e1134d6c]}, clusterSettings={hosts=[localhost:27017], srvServiceName=mongodb, mode=SINGLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=false, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=UNSPECIFIED, serverApi=null, autoEncryptionSettings=null, contextProvider=null} (org.mongodb.driver.client:71)
[2023-07-28 15:36:08,707] INFO [mongodb-source-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:193}] to localhost:27017 (org.mongodb.driver.connection:71)
[2023-07-28 15:36:08,707] INFO [mongodb-source-connector|task-0] Opened connection [connectionId{localValue:3, serverValue:194}] to localhost:27017 (org.mongodb.driver.connection:71)
[2023-07-28 15:36:08,708] INFO [mongodb-source-connector|task-0] Watching for collection changes on 'productDb.products' (com.mongodb.kafka.connect.source.MongoSourceTask:637)
[2023-07-28 15:36:08,708] INFO [mongodb-source-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=17, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=5418304} (org.mongodb.driver.cluster:71)
[2023-07-28 15:36:08,762] INFO [mongodb-source-connector|task-0] New change stream cursor created without offset. (com.mongodb.kafka.connect.source.MongoSourceTask:417)
[2023-07-28 15:36:08,793] INFO [mongodb-source-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:195}] to localhost:27017 (org.mongodb.driver.connection:71)
[2023-07-28 15:36:08,828] WARN [mongodb-source-connector|task-0] Failed to resume change stream: The $changeStream stage is only supported on replica sets 40573

=====================================================================================
If the resume token is no longer available then there is the potential for data loss.
Saved resume tokens are managed by Kafka and stored with the offset data.

To restart the change stream with no resume token either: 
  * Create a new partition name using the `offset.partition.name` configuration.
  * Set `errors.tolerance=all` and ignore the erroring resume token. 
  * Manually remove the old offset from its configured storage.

Resetting the offset will allow for the connector to be resume from the latest resume
token. Using `startup.mode = copy_existing` ensures that all data will be outputted by the
connector but it will duplicate existing data.
=====================================================================================
 (com.mongodb.kafka.connect.source.MongoSourceTask:458)
[2023-07-28 15:36:08,830] INFO [mongodb-source-connector|task-0] Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:163)
[2023-07-28 15:36:08,830] INFO [mongodb-source-connector|task-0] WorkerSourceTask{id=mongodb-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:275)
[2023-07-28 15:36:08,834] INFO [mongodb-source-connector|task-0] Watching for collection changes on 'productDb.products' (com.mongodb.kafka.connect.source.MongoSourceTask:637)
[2023-07-28 15:36:08,838] INFO [mongodb-source-connector|task-0] New change stream cursor created without offset. (com.mongodb.kafka.connect.source.MongoSourceTask:417)
[2023-07-28 15:36:08,842] WARN [mongodb-source-connector|task-0] Failed to resume change stream: The $changeStream stage is only supported on replica sets 40573

=====================================================================================
If the resume token is no longer available then there is the potential for data loss.
Saved resume tokens are managed by Kafka and stored with the offset data.

To restart the change stream with no resume token either: 
  * Create a new partition name using the `offset.partition.name` configuration.
  * Set `errors.tolerance=all` and ignore the erroring resume token. 
  * Manually remove the old offset from its configured storage.

Resetting the offset will allow for the connector to be resume from the latest resume
token. Using `startup.mode = copy_existing` ensures that all data will be outputted by the
connector but it will duplicate existing data.
=====================================================================================
 (com.mongodb.kafka.connect.source.MongoSourceTask:458)
[2023-07-28 15:36:08,846] ERROR [mongodb-source-connector|task-0] WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Unexpected error: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:597)
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.pollInternal(StartedMongoSourceTask.java:211)
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.poll(StartedMongoSourceTask.java:188)
	at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:173)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:462)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:351)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:857)
Caused by: java.lang.NullPointerException: Cannot invoke "com.mongodb.client.MongoChangeStreamCursor.tryNext()" because "this.cursor" is null
	at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:579)
	... 14 more
[2023-07-28 15:36:08,849] INFO [mongodb-source-connector|task-0] Stopping MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:178)
[2023-07-28 15:36:08,849] INFO [mongodb-source-connector|task-0] Stopping MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:337)
[2023-07-28 15:36:08,860] INFO [mongodb-source-connector|task-0] [Producer clientId=connector-producer-mongodb-source-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1310)
[2023-07-28 15:36:08,864] INFO [mongodb-source-connector|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)
[2023-07-28 15:36:08,865] INFO [mongodb-source-conn

there is the mongo.errors.tolerance setting for just the connector.

This section (https://www.mongodb.com/docs/kafka-connector/current/sink-connector/fundamentals/error-handling-strategies/#handle-errors-at-the-connector-level) makes it seem like it would only apply to MongoDB errors; however, the related information page (https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/error-handling/#std-label-sink-configuration-error-handling) makes it sound like it’s just an override property, and would apply to all errors.

We did not want all errors to be silently ignored, which is why we did not leave that setting on. Can you confirm that mongo.errors.tolerance only applies to MongoDB-related errors?

If not, we will wait on the results of that ticket you opened – thanks for doing so.

Hi,

Thanks for the logs - looks like the cursor couldnt be restarted as the resume token is no longer there.

It shouldn’t NPE though and that will have to be fixed.

mongo.errors.tolerance relates to errors coming from the MongoDB connector and does not impact Kafka connect error tolerance handling.

Ross