MongoDB Kafka connect ChangeStreamHandler do not support truncatedArrays

I am using ChangeStreamHandler in mongo Kafka sink connector to stream changes from mongo source

"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"

On processing updates events from the source MongoDB change events, the change stream handler is failing with an exception

 ERROR Unable to process record SinkRecord{kafkaOffset=3, timestampType=CreateTime} ConnectRecord{topic='quickstart.sampleData', kafkaPartition=0, key={"_id": {"_data": "8262A5CD4B000000012B022C0100296E5A1004B80560BF7F114B04962A5F523CEAB5D046645F6964006462A5CC9B84956FD488691BF10004"}}, keySchema=Schema{STRING}, value={"_id": {"_data": "8262A5CD4B000000012B022C0100296E5A1004B80560BF7F114B04962A5F523CEAB5D046645F6964006462A5CC9B84956FD488691BF10004"}, "operationType": "update", "clusterTime": {"$timestamp": {"t": 1655033163, "i": 1}}, "ns": {"db": "quickstart", "coll": "sampleData"}, "documentKey": {"_id": {"$oid": "62a5cc9b84956fd488691bf1"}}, "updateDescription": {"updatedFields": {"hello": "moto"}, "removedFields": [], "truncatedArrays": []}}, valueSchema=Schema{STRING}, timestamp=1655033166742, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.apache.kafka.connect.errors.DataException: Warning unexpected field(s) in updateDescription [truncatedArrays]. {"updatedFields": {"hello": "moto"}, "removedFields": [], "truncatedArrays": []}. Cannot process due to risk of data loss.
at com.mongodb.kafka.connect.sink.cdc.mongodb.operations.OperationHelper.getUpdateDocument(OperationHelper.java:99)
at com.mongodb.kafka.connect.sink.cdc.mongodb.operations.Update.perform(Update.java:57)
at com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler.handle(ChangeStreamHandler.java:84)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
at java.base/java.util.Optional.flatMap(Optional.java:294)

Below is the Change stream event received on the sink side

{"schema":{"type":"string","optional":false},"payload":"{\"_id\": {\"_data\": \"8262A5CD4B000000012B022C0100296E5A1004B80560BF7F114B04962A5F523CEAB5D046645F6964006462A5CC9B84956FD488691BF10004\"}, \"operationType\": \"update\", \"clusterTime\": {\"$timestamp\": {\"t\": 1655033163, \"i\": 1}}, \"ns\": {\"db\": \"quickstart\", \"coll\": \"sampleData\"}, \"documentKey\": {\"_id\": {\"$oid\": \"62a5cc9b84956fd488691bf1\"}}, \"updateDescription\": {\"updatedFields\": {\"hello\": \"moto\"}, \"removedFields\": [], \"truncatedArrays\": []}}"}

On looking at the code in class -

com.mongodb.kafka.connect.sink.cdc.mongodb.operations.OperationHelper.getUpdateDocument(OperationHelper.java:99)

It shows that the updateDescription.updatedfields only handles updatedFields & removedFields… support for truncatedArrays is not present. Is this a bug? or I need to tune my source connector to somehow stop sending truncatedArrays in changeEvents.
Can someone from the community please help

My Inserts and delete events are successfully streaming from source to sink mongoDB.

Building a Datapiple
Source MongoDb → Change events → kafka connect → Sink MongoDb

We have this in the backlog for the next release. https://jira.mongodb.org/projects/KAFKA/issues/KAFKA-165

Thanks, Robert,
Is there an ETA for this, as this is failing all the update type CDC events on the sink connector side…

Hi Team,
We are facing the same issue in our replication right after we upgraded mongodb FCV from 4.4 to 5.0.
Can you please update us when we can get a new release of Kafka connect?

Regards
Alan

We attempt to release a new version every quarter. As a work around consider setting publish.full.document.only on the source to true.

I had the same issue here and i could solve that setting up the following configuration at the Source Connector:

"change.stream.full.document": "updateLookup"

A Full Exemple:

{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "yourMongodbUri",
    "database": "yourDataBase",
    "collection": "yourCollection",
    "change.stream.full.document": "updateLookup"
  }
}