I am trying to perform one-way sync from one mongoDB instance to another mongoDB instance using Apache Kafka Connectors. The sink connector is getting failed with the following error.
[2022-03-11 07:55:08,923] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Adding newly assigned partitions: cloud.sample_mflix.users-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:291)
[2022-03-11 07:55:08,931] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Setting offset for partition cloud.sample_mflix.users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ip-172-31-23-237.eu-west-2.compute.internal:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)
[2022-03-11 07:55:08,975] ERROR [onprem-sink|task-0] Unable to process record SinkRecord{kafkaOffset=4, timestampType=CreateTime} ConnectRecord{topic='cloud.sample_mflix.users', kafkaPartition=0, key=[B@27afcd5e, keySchema=Schema{BYTES}, value=[B@101483c4, valueSchema=Schema{BYTES}, timestamp=1646985128566, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData:109)
java.lang.UnsupportedOperationException: RawBsonDocument instances are immutable
at org.bson.RawBsonDocument.put(RawBsonDocument.java:185)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.put(LazyBsonDocument.java:98)
at org.bson.BsonDocument.append(BsonDocument.java:784)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:52)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
[2022-03-11 07:55:08,994] ERROR [onprem-sink|task-0] WorkerSinkTask{id=onprem-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: RawBsonDocument instances are immutable (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
java.lang.UnsupportedOperationException: RawBsonDocument instances are immutable
at org.bson.RawBsonDocument.put(RawBsonDocument.java:185)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.put(LazyBsonDocument.java:98)
at org.bson.BsonDocument.append(BsonDocument.java:784)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:52)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
[2022-03-11 07:55:08,996] ERROR [onprem-sink|task-0] WorkerSinkTask{id=onprem-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.UnsupportedOperationException: RawBsonDocument instances are immutable
at org.bson.RawBsonDocument.put(RawBsonDocument.java:185)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.put(LazyBsonDocument.java:98)
at org.bson.BsonDocument.append(BsonDocument.java:784)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:52)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
[2022-03-11 07:55:08,996] INFO [onprem-sink|task-0] Stopping MongoDB sink task (com.mongodb.kafka.connect.sink.MongoSinkTask:115)
[2022-03-11 07:55:08,998] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Revoke previously assigned partitions cloud.sample_mflix.users-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:310)
[2022-03-11 07:55:08,999] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Member connector-consumer-onprem-sink-0-df476158-422b-4464-a368-6a189abf536a sending LeaveGroup request to coordinator ip-172-31-23-237.eu-west-2.compute.internal:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1060)
[2022-03-11 07:55:09,002] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:972)
[2022-03-11 07:55:09,003] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1000)
[2022-03-11 07:55:09,004] INFO [onprem-sink|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2022-03-11 07:55:09,004] INFO [onprem-sink|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2022-03-11 07:55:09,004] INFO [onprem-sink|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2022-03-11 07:55:09,007] INFO [onprem-sink|task-0] App info kafka.consumer for connector-consumer-onprem-sink-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)