Kafka Sink Connectors are failing with "RawBsonDocument instances are immutable" Error

I am trying to perform one-way sync from one mongoDB instance to another mongoDB instance using Apache Kafka Connectors. The sink connector is getting failed with the following error.

[2022-03-11 07:55:08,923] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Adding newly assigned partitions: cloud.sample_mflix.users-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:291)
[2022-03-11 07:55:08,931] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Setting offset for partition cloud.sample_mflix.users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ip-172-31-23-237.eu-west-2.compute.internal:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)
[2022-03-11 07:55:08,975] ERROR [onprem-sink|task-0] Unable to process record SinkRecord{kafkaOffset=4, timestampType=CreateTime} ConnectRecord{topic='cloud.sample_mflix.users', kafkaPartition=0, key=[B@27afcd5e, keySchema=Schema{BYTES}, value=[B@101483c4, valueSchema=Schema{BYTES}, timestamp=1646985128566, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData:109)
java.lang.UnsupportedOperationException: RawBsonDocument instances are immutable
	at org.bson.RawBsonDocument.put(RawBsonDocument.java:185)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.put(LazyBsonDocument.java:98)
	at org.bson.BsonDocument.append(BsonDocument.java:784)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:52)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
[2022-03-11 07:55:08,994] ERROR [onprem-sink|task-0] WorkerSinkTask{id=onprem-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: RawBsonDocument instances are immutable (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
java.lang.UnsupportedOperationException: RawBsonDocument instances are immutable
	at org.bson.RawBsonDocument.put(RawBsonDocument.java:185)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.put(LazyBsonDocument.java:98)
	at org.bson.BsonDocument.append(BsonDocument.java:784)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:52)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
[2022-03-11 07:55:08,996] ERROR [onprem-sink|task-0] WorkerSinkTask{id=onprem-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.UnsupportedOperationException: RawBsonDocument instances are immutable
	at org.bson.RawBsonDocument.put(RawBsonDocument.java:185)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.put(LazyBsonDocument.java:98)
	at org.bson.BsonDocument.append(BsonDocument.java:784)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:52)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more

[2022-03-11 07:55:08,996] INFO [onprem-sink|task-0] Stopping MongoDB sink task (com.mongodb.kafka.connect.sink.MongoSinkTask:115)
[2022-03-11 07:55:08,998] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Revoke previously assigned partitions cloud.sample_mflix.users-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:310)
[2022-03-11 07:55:08,999] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Member connector-consumer-onprem-sink-0-df476158-422b-4464-a368-6a189abf536a sending LeaveGroup request to coordinator ip-172-31-23-237.eu-west-2.compute.internal:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1060)
[2022-03-11 07:55:09,002] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:972)
[2022-03-11 07:55:09,003] INFO [onprem-sink|task-0] [Consumer clientId=connector-consumer-onprem-sink-0, groupId=connect-onprem-sink] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1000)
[2022-03-11 07:55:09,004] INFO [onprem-sink|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2022-03-11 07:55:09,004] INFO [onprem-sink|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2022-03-11 07:55:09,004] INFO [onprem-sink|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2022-03-11 07:55:09,007] INFO [onprem-sink|task-0] App info kafka.consumer for connector-consumer-onprem-sink-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

Hi @Vittal_Pai,

I’ve added KAFKA-301 to fix the issue. When converting bytes[] from Kafka the connector uses RawBsonDocument instances which are immutable. As such no fields can be added by post processors.

The work around until a fix is released would be to add a post processor that converts the RawBsonDocument into a BsonDocument which can be mutated.

Ross

2 Likes

Thanks @Ross_Lawley for quick answer.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.