Mongo Db kafka connector error while writing to sharded MongoDB via mongos instance

Hi All,

We are trying to pump messages from kafka to Mongodb . Kafka is running on Ubuntu while Mongo DB is installed on RHEL 8 servers. MongoDB is a shareded one and we have set up mongos instances to connect to MongoDB.

The load from kafka to MongoDB via kafka-connector is failing with error when connection uri is set to mongos instance:

Bulk write operation error on server. Write errors: [BulkWriteError{index=0, code=61, message=‘Failed to target upsert by query :: could not extract exact shard key’, details={}}]

However, load succeeds when we set the the mongo db instances as connection uri.

Please help

Full error:

/python-code-test-data-gen$ {“mongodb-sink-connector”:{“status”:{“name”:“mongodb-sink-connector”,“connector”:{“state”:“RUNNING”,“worker_id”:""},“tasks”:

[{“id”:0,“state”:“FAILED”,“worker_id”:"",“trace”:“org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: com.mongodb.MongoBulkWriteException: Bulk write operation error on server . Write errors: [BulkWriteError{index=0, code=61, message=‘Failed to target upsert by query :: could not extract exact shard key’, details={}}]. \n\tat com.mongodb.kafka.connect.sink.StartedMongoSinkTask.handleTolerableWriteException(StartedMongoSinkTask.java:168)\n\tat com.mongodb.kafka.connect.sink.StartedMongoSinkTask.bulkWriteBatch(StartedMongoSinkTask.java:111)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\tat com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:76)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)\n\t… 10 more\nCaused by: com.mongodb.MongoBulkWriteException: Bulk write operation error on server . Write errors: [BulkWriteError{index=0, code=61, message=‘Failed to target upsert by query :: could not extract exact shard key’, details={}}]. \n\tat com.mongodb.internal.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:167)\n\tat com.mongodb.internal.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:192)\n\tat com.mongodb.internal.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:136)\n\tat com.mongodb.internal.operation.BulkWriteBatch.getResult(BulkWriteBatch.java:224)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:363)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$execute$2(MixedBulkWriteOperation.java:260)\n\tat com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:575)\n\tat com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)\n\tat com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:574)\n\tat com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)\n\tat com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:573)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$execute$3(MixedBulkWriteOperation.java:232)\n\tat com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:268)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:84)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:212)\n\tat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:443)\n\tat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:423)\n\tat com.mongodb.kafka.connect.sink.StartedMongoSinkTask.bulkWriteBatch(StartedMongoSinkTask.java:104)\n\t… 14 more\n”},