MongoDB Source and Sink Connector: Update Operations Not Upserting Missing Documents in Sink Collection

When using MongoDB source and sink connectors, if an update operation occurs in the source collection for a document that doesn’t exist in the sink collection, that document is not updated (upserted) in the sink collection. I would like to configure it so that these documents are upserted in the sink collection.

For my experiment, I created source-coll and sink-coll in a database called connector-test.
I inserted three documents. Each document either has no model field, has it explicitly set to null, or has it set to “GPT4”, while the rest of the fields are identical (except for _id).

{
"_id": { "$oid": "67f766d7f22eec57d81ceaf7" },
"model": null
}
{
"_id": { "$oid": "680630f23cb5de1e060c2056" },
}
{
"_id": { "$oid": "680634f03cb5de1e060c205b" },
"model": "GPT4"
}

I then configured the following source connector:
(We’re using Strimzi for the connector, but that doesn’t seem to be the key issue here.)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: test-source
namespace: kafka-dev
labels:
strimzi.io/cluster: kafka-connect-cluster-dev
spec:
class: com.mongodb.kafka.connect.MongoSourceConnector
tasksMax: 1
config:
# MongoDB Connection configuration
connection.uri: "${secrets:kafka-dev/dev-kafka-connect-infisical-secrets:MONGODB_WRTN_COPY_URI}"
database: connector-test
collection: source-coll
change.stream.full.document: updateLookup
mongodb.ssl.enabled: true
# Kafka Converter configuration
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
# Output format configuration
output.format.value: json
# Error Handling configuration
errors.tolerance: all
errors.log.include.messages: true
errors.log.enable: true
# Topic configuration
topic.creation.enable: true
topic.prefix: test-source.mongo-cdc.json
topic.creation.default.partitions: 1
topic.creation.default.replication.factor: 2
topic.creation.default.local.retention.ms: 86400000 # 1 day
topic.creation.default.local.retention.bytes: 10737418240 # 10GB
# Startup configuration
startup.mode: copy_existing
pipeline: |
[
{
$match: {
$and: [
{ "fullDocument.model": { $nin: [null] } }
]
}
}
]
# Producer configuration
producer.override.compression.type: gzip
producer.override.max.request.size: 20971520 # 2MB

After the source connector was connected, only the data with model set to “GPT4” appeared in the topic test-source.mongo-cdc.json.connector-test.source-coll. In other words, data with no model field or with model set to null was filtered out.
After configuring and connecting the sink connector as shown below, naturally only documents with model: GPT4 from the topic were inserted into sink-coll.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: test-sink
namespace: kafka-dev
labels:
strimzi.io/cluster: kafka-connect-cluster-dev
spec:
class: com.mongodb.kafka.connect.MongoSinkConnector
tasksMax: 1
autoRestart:
enabled: true
config:
# MongoDB Connection configuration
connection.uri: "${secrets:kafka-dev/dev-kafka-connect-infisical-secrets:MONGODB_WRTN_COPY_URI}"
mongodb.ssl.enabled: true
database: connector-test
collection: sink-coll
# Error Handling configuration
errors.tolerance: all
errors.log.include.messages: true
errors.log.enable: true
# Dead Letter Queue configuration
errors.deadletterqueue.topic.name: dlq-test-sink
errors.deadletterqueue.context.headers.enable: true
errors.deadletterqueue.topic.replication.factor: 2
# Kafka Converter configuration
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
# Change Data Capture Handler configuration
change.data.capture.handler: com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
# Topic configuration
topics: test-source.mongo-cdc.json.connector-test.source-coll

Therefore, there are 3 documents in source-coll and 1 document in sink-coll.
Now, when I update the model field to “GPT5” for all documents in source-coll, the topic publishes 3 messages with each document showing the model updated to GPT5.
However, since sink-coll only contains the document that originally had model: GPT4, only that document gets updated to GPT5. The documents that originally had no model field or had model: null are not upserted to sink-coll.

I’m quite shocked to realize this just now, as I had naturally assumed they would be upserted. According to the Write Strategy documentation, it uses ReplaceOneDefaultStrategy by default. I thought this meant it would replace everything regardless of the operationType.
Now that I’m aware of this issue, I’m looking for a solution. Given that update operations don’t perform upserts by default, I suspect other Write Strategies would have the same problem.
I’m wondering how this can be resolved.
connector version: 1.15.0