MongoDB Kafka Connect - Sink connector failing for updates

Hi All,
Iam new to MongoDB Kafka connect. I am trying sync the change stream from 1 mongo collection to another using Kafka connectors, both Inserts and updates operations
Using the quickstart guide published for kafka-connectors.

Source config-

 {
"name": "mongo-sourceV2",
"config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo1:27017/?replicaSet=rs0",
    "database": "quickstart",
    "collection": "transactionV2",
    "pipeline": "[{\"$match\":{\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}}]"
}}

Sink Config -

{
"name": "mongo-sinkV2",
"config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "connection.uri": "mongodb://mongo1:27017/?replicaSet=rs0",
    "database": "quickstart",
    "collection": "transactionV1",
    "topics": "quickstart.transactionV2",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "mongo.errors.tolerance": "all",
    "mongo.errors.log.enable": true,
    "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}}

Kafka Topic Event for update -

{"schema":{"type":"string","optional":false},"payload":"{\"_id\": {\"_data\": \"8262A512F7000000012B022C0100296E5A1004195DB8CC822F4A4FAE4ECCE5917B98A946645F6964006462A512A5F74E67E722B3B6760004\"}, \"operationType\": \"update\", \"clusterTime\": {\"$timestamp\": {\"t\": 1654985463, \"i\": 1}}, \"ns\": {\"db\": \"quickstart\", \"coll\": \"transactionV2\"}, \"documentKey\": {\"_id\": {\"$oid\": \"62a512a5f74e67e722b3b676\"}}, \"updateDescription\": {\"updatedFields\": {\"amount\": 10001}, \"removedFields\": [], \"truncatedArrays\": []}}"}

My Inserts are streaming fine but the updates are failing on the sink connector side with Exception

[2022-06-11 22:11:07,195] ERROR Unable to process record SinkRecord{kafkaOffset=9, timestampType=CreateTime} ConnectRecord{topic='quickstart.transactionV2', kafkaPartition=0, key={"_id": {"_data": "8262A512F7000000012B022C0100296E5A1004195DB8CC822F4A4FAE4ECCE5917B98A946645F6964006462A512A5F74E67E722B3B6760004"}}, keySchema=Schema{STRING}, value={"_id": {"_data": "8262A512F7000000012B022C0100296E5A1004195DB8CC822F4A4FAE4ECCE5917B98A946645F6964006462A512A5F74E67E722B3B6760004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1654985463, "i": 1}}, "ns": {"db": "quickstart", "coll": "transactionV2"}, "documentKey": {"_id": {"$oid": "62a512a5f74e67e722b3b676"}}, "updateDescription": {"updatedFields": {"amount": 10001}, "removedFields": [], "truncatedArrays": []}}, valueSchema=Schema{STRING}, timestamp=1654985467191, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)

org.apache.kafka.connect.errors.DataException: Warning unexpected field(s) in updateDescription [truncatedArrays]. {"updatedFields": {"amount": 10001}, "removedFields": [], "truncatedArrays": []}. Cannot process due to risk of data loss.
at com.mongodb.kafka.connect.sink.cdc.mongodb.operations.OperationHelper.getUpdateDocument(OperationHelper.java:99)
at com.mongodb.kafka.connect.sink.cdc.mongodb.operations.Update.perform(Update.java:57)
at com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler.handle(ChangeStreamHandler.java:84)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
at java.base/java.util.Optional.flatMap(Optional.java:294)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)

mongodb

I had the same issue here and i could solve that setting up the following configuration at the Source Connector :

"change.stream.full.document": "updateLookup"

A Full Exemple:

{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "yourMongodbUri",
    "database": "yourDataBase",
    "collection": "yourCollection",
    "change.stream.full.document": "updateLookup"
  }
}