Delete issues with ChangeStreamHandler

Hi, I am having problems when I try to delete a record by CDC. The scenario is a bit complex as I have a source connector in OmPrem , I replicate to Kafka Cloud and a sink connector that writes to the mongo.

Source Connector:
“connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector”,
“tasks.max”:“1”,
“database”:“test”,
“collection”:“”,

“topic.namespace.map”:“{"test.authorization_codes": "AUTHORIZATION_CODES.DELETE.OUT_NGOB.TOP"}”,
“output.format.key”:“schema”,
“output.format.value”:“schema”,
“output.schema.infer.value”:“false”,
“change.stream.full.document”:“updateLookup”,

“poll.max.batch.size”:“1000”,
“poll.await.time.ms”:“5000”,

“pipeline”:“[{"$match": { "$and": [{"operationType": "delete"}, {"ns.coll": {"$regex": /^(authorization_codes)$/}}] } }]”

Sink:
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,

"database":"testAzure",
"collection":"",

"topics":"AZURE.APIGATEW.AUTHORIZATION_CODES.DELETE.IN_NGOB.TOP",

"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"XXXX",
"value.converter.schemas.enable": "true",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"XXX",

"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"XXX",
"key.converter.schemas.enable": "true",
"key.converter.basic.auth.credentials.source":"USER_INFO",
"key.converter.basic.auth.user.info":"XXXXXX",

"change.data.capture.handler":"com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",

"topic.override.AZURE.APIGATEW.AUTHORIZATION_CODES.DELETE.IN_NGOB.TOP.collection":"authorization_codes"

I see that I replicate well, so the source connector and replication works. When I start the Sink connector I get this error:

[2023-02-13 18:24:52,128] ERROR task-thread-QH1-000.AZURE.APIGATEW.AUTHORIZATION_CODES.DELETE.IN_NGOB.TOP_azure.hibridacion.sink-0 Unable to process record SinkRecord{kafkaOffset=80919, timestampType=CreateTime} ConnectRecord{topic=‘AZURE.APIGATEW.AUTHORIZATION_CODES.DELETE.IN_NGOB.TOP’, kafkaPartition=0, key=Struct{_id={“_data”: “8263EA59C8000000012B022C0100296E5A100424C02CCF0FC14A46A154621CF8D3FAC3463C5F6964003C557154484E5163416C666965785557696E526267000004”}}, keySchema=Schema{keySchema:STRUCT}, value=Struct{_id={“_data”: “8263EA59C8000000012B022C0100296E5A100424C02CCF0FC14A46A154621CF8D3FAC3463C5F6964003C557154484E5163416C666965785557696E526267000004”},operationType=delete,ns=Struct{db=testAzure,coll=authorization_codes},documentKey={“_id”: “UqTHNQcAlfiexUWinRbg”},clusterTime={“$timestamp”: {“t”: 1676302792, “i”: 1}}}, valueSchema=Schema{ChangeStream:STRUCT}, timestamp=1676303861116, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.apache.kafka.connect.errors.DataException: Unexpected documentKey field type, expecting a document but found BsonString{value='{"_id": "UqTHNQcAlfiexUWinRbg"}'}: {“_id”: “{"_data": "8263EA59C8000000012B022C0100296E5A100424C02CCF0FC14A46A154621CF8D3FAC3463C5F6964003C557154484E5163416C666965785557696E526267000004"}”, “operationType”: “delete”, “fullDocumentBeforeChange”: null, “fullDocument”: null, “ns”: {“db”: “testAzure”, “coll”: “authorization_codes”}, “to”: null, “documentKey”: “{"_id": "UqTHNQcAlfiexUWinRbg"}”, “updateDescription”: null, “clusterTime”: “{"$timestamp": {"t": 1676302792, "i": 1}}”, “txnNumber”: null, “lsid”: null}
at com.mongodb.kafka.connect.sink.cdc.mongodb.operations.OperationHelper.getDocumentKey(OperationHelper.java:53)
at com.mongodb.kafka.connect.sink.cdc.mongodb.operations.Delete.perform(Delete.java:42)
at com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler.handle(ChangeStreamHandler.java:84)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
at java.base/java.util.Optional.flatMap(Optional.java:294)

I don’t know what to try anymore ? Could you help me