I am trying to use the MongoDB Source connector -
I have created the DB with name testDB
& a collection named init
Following is the connector config -
{
"name": "mongo-source-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo:27017",
"database": "testDB",
"collection": "init"
}
}
I have tried using multiple different properties which are mentioned in MongoDB connector doc but always got some error.
Following is the error -
2023-07-05 18:43:27 [2023-07-05 13:13:27,026] ERROR WorkerSourceTask{id=mongo-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2023-07-05 18:43:27 org.apache.kafka.connect.errors.ConnectException: Unexpected error: null
2023-07-05 18:43:27 at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:597)
2023-07-05 18:43:27 at com.mongodb.kafka.connect.source.StartedMongoSourceTask.pollInternal(StartedMongoSourceTask.java:211)
2023-07-05 18:43:27 at com.mongodb.kafka.connect.source.StartedMongoSourceTask.poll(StartedMongoSourceTask.java:188)
2023-07-05 18:43:27 at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:173)
2023-07-05 18:43:27 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:462)
2023-07-05 18:43:27 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:351)
2023-07-05 18:43:27 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
2023-07-05 18:43:27 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-07-05 18:43:27 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
2023-07-05 18:43:27 at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
2023-07-05 18:43:27 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-07-05 18:43:27 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-07-05 18:43:27 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-07-05 18:43:27 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-07-05 18:43:27 at java.base/java.lang.Thread.run(Thread.java:829)
2023-07-05 18:43:27 Caused by: java.lang.NullPointerException
2023-07-05 18:43:27 at com.mongodb.kafka.connect.source.StartedMongoSourceTask.getNextBatch(StartedMongoSourceTask.java:579)
2023-07-05 18:43:27 ... 14 more
Following is the version details - confluentinc/cp-zookeeper
: latest
confluentinc/cp-kafka
: latest
confluentinc/cp-schema-registry
: latest
confluentinc/cp-kafka-connect
:latest
mongo
: latest
Plugin:
mongodb/kafka-connect-mongodb
: 1.10.1
Just a note that the sink connector works fine
Can you try this with the latest version of the connector ?
A null pointer issue was identified https://jira.mongodb.org/browse/KAFKA-383 and it was addressed.
Note, the confluent hub is currently being updated so it might still show 1.10.1 as the latest, if it does you can grab the latest from https://github.com/mongodb/mongo-kafka/releases/tag/r.11.0
@Robert_Walters I have updated to version 1.11.0 and I’m no longer getting the nullpointer but I notice an error “Unable to recreate the cursor” this error keeps getting printed in a continuous look in the logs non-stop
If the resume token is no longer available then there is the potential for data loss.
Saved resume tokens are managed by Kafka and stored with the offset data.
To restart the change stream with no resume token either:
* Create a new partition name using the `offset.partition.name` configuration.
* Set `errors.tolerance=all` and ignore the erroring resume token.
* Manually remove the old offset from its configured storage.
Resetting the offset will allow for the connector to be resume from the latest resume
token. Using `startup.mode = copy_existing` ensures that all data will be outputted by the
connector but it will duplicate existing data.
=====================================================================================
(com.mongodb.kafka.connect.source.MongoSourceTask)
[2023-08-24 08:38:18,471] INFO Unable to recreate the cursor (com.mongodb.kafka.connect.source.MongoSourceTask)
[2023-08-24 08:38:18,477] INFO Watching for collection changes on 'avd.vehicles' (com.mongodb.kafka.connect.source.MongoSourceTask)
[2023-08-24 08:38:18,478] INFO New change stream cursor created without offset. (com.mongodb.kafka.connect.source.MongoSourceTask)
[2023-08-24 08:38:18,480] WARN Failed to resume change stream: The $changeStream stage is only supported on replica sets 40573
Are you using MongoDB or a third party MongoDB API like CosmosDB, DocumentDB, etc?
If MongoDB, do you have it running as a replica set or a single stand alone instance?
@Robert_Walters Locally I’m running mongo:5.0.15 as a Docker container, should be running as a single stand alone instance
Below is the snippet of our docker-compose.yaml file
documentdb:
platform: ${PLATFORM:-linux/amd64}
image: mongo:5.0.15
restart: "unless-stopped"
ports:
- '27017:27017'
environment:
MONGO_INITDB_ROOT_USERNAME: <username>
MONGO_INITDB_ROOT_PASSWORD: <password>
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo localhost:27017/productiondb --quiet
interval: 10s
timeout: 10s
retries: 5
start_period: 40s
that is why it doesn’t work, you need to run Mongodb as a replica set because single node MongoDBs do not have change streams. When you connect from the MongoDB Connector for Apache Kafka it opens a change stream on the collection you specify.
if you are using this just as a test scenario you can run a single node replica set just
in your dockerfile under documentdb: (odd name) add this
command: --replSet rs0
then once it is up, connect and run this script
_id: "rs0",
members: [{ _id: 0, host: "mongo1:27017", priority: 1.0 }],
};
rs.initiate(rsconf);
rs.status();
@Robert_Walters thank you, will try this
The service name is documentDB because we running AWS DocumentDB in Production, will this be an issue?
No idea, DocumentDB is not MongoDB. Why not use MongoDB Atlas? It is available in the AWS Marketplace and you can use VPC Peering just like with DocumentDB.