Delete Operations with MongoDB Kafka Connector

I’m new to Kafka Connectors and am trying to set up a simple syncing process from one MongoDB instance to another. I was able to get insert and update operations working by setting up a source connector that has the “publish.full.document.only” property set to “true”. However, this doesn’t appear to work for delete operations, and after reading another post, it looks like it’s not possible to handle deletes when that property is set. So I created a separate source connector just to handle the delete operations. The source is now putting the delete messages onto the topic as I would expect, however the sink isn’t working the way I would expect. Instead of deleting the document out of the target DB, it’s inserting a new document that just contains the metadata from the topic message. Does anyone know how I can configure this properly to delete the document based on the _id field?

This is an example of what the document looks like - it exists in both the source and target database and is exactly the same in both, including the _id field:

{
  "_id": {
    "$oid": "644be533bbb4f2ec05a44d1e"
  },
  "test_field": "Sample Document",
  "another_field": 5
}

This is what the source connector config looks like:

{
  "name": "MongoTestSourceDeleteConnector",
  "config": {
    "name": "MongoTestSourceDeleteConnector",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "transforms": "dropPrefix",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "source_db.kafka_source_test_data",
    "transforms.dropPrefix.replacement": "kafka_source_test_data_delete_changes",
    "connection.uri": "<redacted>",
    "database": "source_db",
    "collection": "kafka_source_test_data",
    "pipeline": "[{\"$match\": {\"operationType\": \"delete\"}}]",
    "publish.full.document.only": "false",
    "change.stream.full.document": "default",
    "mongo.errors.log.enable": "true"
  }
}

When I delete the document from the source DB, this is the event that gets put on the Kafka topic.
Key:

{ "_id": { "_data": "82644BE717000000072B022C0100296E5A10040691E9A2364740A1960FB29047C695CB46645F69640064644BE533BBB4F2EC05A44D1E0004" } }

Value:

{
  "_id": {
    "_data": "82644BE717000000072B022C0100296E5A10040691E9A2364740A1960FB29047C695CB46645F69640064644BE533BBB4F2EC05A44D1E0004"
  },
  "operationType": "delete",
  "clusterTime": {
    "$timestamp": {
      "t": 1682695959,
      "i": 7
    }
  },
  "ns": {
    "db": "source_db",
    "coll": "kafka_source_test_data"
  },
  "documentKey": {
    "_id": {
      "$oid": "644be533bbb4f2ec05a44d1e"
    }
  }
}

This is what the sink connector config looks like:

{
  "name": "MongoTestSinkConnector",
  "config": {
    "name": "MongoTestSinkConnector",
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "errors.log.enable": "true",
    "topics": "kafka_source_test_data_delete_changes",
    "connection.uri": "<redacted>",
    "database": "target_db",
    "collection": "kafka_sink_test_data",
    "namespace.mapper.error.if.invalid": "false",
    "delete.on.null.values": "true",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
    "mongo.errors.log.enable": "true",
    "change.data.capture.handler": ""
  }
}

When the sink connector picks the event up from the topic, this is the document that gets inserted into the target DB. The original document that I want to delete doesn’t get modified at all:

{
  "_id": {
    "_data": "82644BE717000000072B022C0100296E5A10040691E9A2364740A1960FB29047C695CB46645F69640064644BE533BBB4F2EC05A44D1E0004"
  },
  "operationType": "delete",
  "clusterTime": {
    "$timestamp": {
      "t": 1682695959,
      "i": 7
    }
  },
  "ns": {
    "db": "source_db",
    "coll": "kafka_source_test_data"
  },
  "documentKey": {
    "_id": {
      "$oid": "644be533bbb4f2ec05a44d1e"
    }
  }
}