MongoDB Kafka Sink Connector w/ Debezium CDC Handler Fails on Update Operations

Hi everyone,

I am trying to set up a CDC replication pipeline with the Debezium and Kafka Sink Connector but I’am having problems with Update Operations.

In on hand, I have a MongoDB source database configured as a single node replica set. Connected to the source DB, I have the Debezium source connector that is streaming all CDC events to a Kafka Topic.

On the other hand, I have a MongoDb acting as a sink database. The sink databased is feed by the MongoDb Sink Connector with the Debezium MongoDB CDC Handler.

The source data is properly replicated into the sink only in insertion and deletion operations. If I try to update a document in the source collection, the sink connector will raise the following exception for this CDC event:

DEBEZIUM CDC UPDATE EVENT

{
   "schema": {
      "type": "struct",
      "fields": [
         {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Json",
            "version": 1,
            "field": "before"
         },
         {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Json",
            "version": 1,
            "field": "after"
         },
         {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Json",
            "version": 1,
            "field": "patch"
         },
         {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Json",
            "version": 1,
            "field": "filter"
         },
         {
            "type": "struct",
            "fields": [
               {
                  "type": "array",
                  "items": {
                     "type": "string",
                     "optional": false
                  },
                  "optional": true,
                  "field": "removedFields"
               },
               {
                  "type": "string",
                  "optional": true,
                  "name": "io.debezium.data.Json",
                  "version": 1,
                  "field": "updatedFields"
               },
               {
                  "type": "array",
                  "items": {
                     "type": "struct",
                     "fields": [
                        {
                           "type": "string",
                           "optional": false,
                           "field": "field"
                        },
                        {
                           "type": "int32",
                           "optional": false,
                           "field": "size"
                        }
                     ],
                     "optional": false,
                     "name": "io.debezium.connector.mongodb.changestream.truncatedarray",
                     "version": 1
                  },
                  "optional": true,
                  "field": "truncatedArrays"
               }
            ],
            "optional": true,
            "name": "io.debezium.connector.mongodb.changestream.updatedescription",
            "version": 1,
            "field": "updateDescription"
         },
         {
            "type": "struct",
            "fields": [
               {
                  "type": "string",
                  "optional": false,
                  "field": "version"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "connector"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "name"
               },
               {
                  "type": "int64",
                  "optional": false,
                  "field": "ts_ms"
               },
               {
                  "type": "string",
                  "optional": true,
                  "name": "io.debezium.data.Enum",
                  "version": 1,
                  "parameters": {
                     "allowed": "true,last,false,incremental"
                  },
                  "default": "false",
                  "field": "snapshot"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "db"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "sequence"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "rs"
               },
               {
                  "type": "string",
                  "optional": false,
                  "field": "collection"
               },
               {
                  "type": "int32",
                  "optional": false,
                  "field": "ord"
               },
               {
                  "type": "string",
                  "optional": true,
                  "field": "lsid"
               },
               {
                  "type": "int64",
                  "optional": true,
                  "field": "txnNumber"
               }
            ],
            "optional": false,
            "name": "io.debezium.connector.mongo.Source",
            "field": "source"
         },
         {
            "type": "string",
            "optional": true,
            "field": "op"
         },
         {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
         },
         {
            "type": "struct",
            "fields": [
               {
                  "type": "string",
                  "optional": false,
                  "field": "id"
               },
               {
                  "type": "int64",
                  "optional": false,
                  "field": "total_order"
               },
               {
                  "type": "int64",
                  "optional": false,
                  "field": "data_collection_order"
               }
            ],
            "optional": true,
            "name": "event.block",
            "version": 1,
            "field": "transaction"
         }
      ],
      "optional": false,
      "name": "src.metrics.customers.Envelope"
   },
   "payload": {
      "before": null,
      "after": "{\"_id\": {\"$numberLong\": \"1001\"},\"first_name\": \"Sallyddf\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}",
      "patch": null,
      "filter": null,
      "updateDescription": {
         "removedFields": null,
         "updatedFields": "{\"first_name\": \"Sallyddf\"}",
         "truncatedArrays": null
      },
      "source": {
         "version": "2.0.0.Final",
         "connector": "mongodb",
         "name": "src",
         "ts_ms": 1669244642000,
         "snapshot": "false",
         "db": "metrics",
         "sequence": null,
         "rs": "rs0",
         "collection": "customers",
         "ord": 2,
         "lsid": null,
         "txnNumber": null
      },
      "op": "u",
      "ts_ms": 1669244642381,
      "transaction": null
   }
}

Sink Connector Exception:

ERROR Unable to process record SinkRecord{kafkaOffset=4, timestampType=CreateTime} ConnectRecord{topic='src.metrics.customers', kafkaPartition=0, key={id=1001}, keySchema=null, value=Struct{after={"_id": {"$numberLong": "1001"},"first_name": "Sallyddf","last_name": "Thomas","email": "sally.thomas@acme.com"},updateDescription=Struct{updatedFields={"first_name": "Sallyddf"}},source=Struct{version=2.0.0.Final,connector=mongodb,name=src,ts_ms=1669244642000,snapshot=false,db=metrics,rs=rs0,collection=customers,ord=2},op=u,ts_ms=1669244642381}, valueSchema=Schema{src.metrics.customers.Envelope:STRUCT}, timestamp=1669244642856, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
metrics-sink-connect    | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect    |       at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect    |       at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect    |       ... 22 more
metrics-sink-connect    | [2022-11-23 23:04:02,876] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value expected to be of type STRING is of unexpected type NULL (org.apache.kafka.connect.runtime.WorkerSinkTask)
metrics-sink-connect    | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect    |       at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect    |       at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect    |       ... 22 more
metrics-sink-connect    | [2022-11-23 23:04:02,878] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
metrics-sink-connect    | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       ... 10 more
metrics-sink-connect    | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect    |       at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect    |       at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect    |       at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect    |       at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect    |       ... 22 more

I followed all the examples and documentation from Debezium and MondoDb Sink Connector and I still have no clue why this is happening.

Please find below the dockerfiles and my configurations:

Debezium Sink Connector Dockefile

FROM quay.io/debezium/connect:2.0
ENV KAFKA_CONNECT_MONGODB_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-mongodb

USER root
RUN microdnf -y install git maven java-11-openjdk-devel && microdnf clean all

USER kafka

# Deploy MongoDB Sink Connector
RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && cd $KAFKA_CONNECT_MONGODB_DIR && \
  git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git && \
  cd kafka-connect-mongodb && \
  git fetch --tags && \
  git checkout tags/v1.2.0 && \
  mvn clean package -DskipTests=true -DskipITs=true && \
  mv target/kafka-connect-mongodb/kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar $KAFKA_CONNECT_MONGODB_DIR && \
  cd .. && rm -rf $KAFKA_CONNECT_MONGODB_DIR/kafka-connect-mongodb

Please find below the dockerfiles and my configurations:

MongoDB Kafka Sink Connector Dockerfile

FROM confluentinc/cp-kafka-connect:7.2.2
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

Debezium Source Connector Configuration

{
  "name": "metrics",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.name": "metrics-src",
    "mongodb.user": "admin",
    "mongodb.password": "admin",
    "mongodb.authsource": "admin",
    "mongodb.hosts": "rs0/metrics-src:27017",
    "topic.prefix": "src",
    "database.include.list": "metrics"
  }
}

MongoDb Sink Configuration with CDC Handler

{
  "name": "metrics",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
    "connection.uri": "mongodb://metrics-sink:27017/metrics",
    "database": "metrics",
    "collection": "metrics",
    "topics": "src.metrics.customers"
  }
}

Docker Compose File

version: '3.4'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    restart: always
    networks:
        - sync-network
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_4LW_COMMANDS_WHITELIST: "*"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
    healthcheck:
      test: nc -z localhost 2181 || exit -1
      interval: 10s
      timeout: 5s
      retries: 3
      start_period: 10s
    extra_hosts:
      - "moby:127.0.0.1"

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    restart: always
    networks:
        - sync-network
    ports:
      - "9092:9092"
      - "39092:39092"
    depends_on:
        zookeeper:
          condition: service_healthy
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://broker:19092,EXTERNAL_LISTENER://0.0.0.0:39092
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://localhost:19092,EXTERNAL_LISTENER://150.230.85.73:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,HOST_LISTENER:PLAINTEXT,EXTERNAL_LISTENER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"
    healthcheck:
      test: echo "ruok" | timeout 2 nc -w 2 zookeeper 2181 | grep imok
      interval: 10s
      timeout: 5s
      retries: 3

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    # network_mode: host
    ports:
      - 9000:9000
    networks:
      - sync-network
    depends_on:
        broker:
          condition: service_healthy
    environment:
      KAFKA_BROKERCONNECT: broker:9092

  metrics-src:
    image: mongo:5.0.5
    hostname: metrics-src
    restart: always
    container_name: metrics-src
    ports:
      - 27040:27017
    networks:
      - sync-network
    environment:
      MONGO_INITDB_DATABASE: metrics
    volumes:
      - ./scripts:/scripts
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
      interval: 10s
      start_period: 30s
    command: --replSet rs0 --bind_ip_all

  metrics-sink:
    image: mongo:5.0.5
    hostname: metrics-sink
    restart: always
    container_name: metrics-sink
    ports:
      - 27020:27017
    networks:
      - sync-network
    environment:
      MONGO_INITDB_DATABASE: metrics
    volumes:
      - ./scripts:/scripts
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
      interval: 10s
      start_period: 30s
    command: --replSet rs0 --bind_ip_all

  metrics-src-connect:
    image: quay.io/debezium/connect:2.0
    container_name: metrics-connect
    ports:
     - 8083:8083
    links:
     - broker
     - metrics-src
    networks:
      - sync-network
    volumes:
     - kafka-src-config:/kafka/config
    environment:
     - BOOTSTRAP_SERVERS=broker:9092
     - REST_HOST_NAME=0.0.0.0
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=metrics_src_connect_configs
     - OFFSET_STORAGE_TOPIC=metrics_src_connect_offsets
     - STATUS_STORAGE_TOPIC=metrics_src_connect_status
     - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
     - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
     - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

  # container with mongo kafka plugins
  metrics-sink-connect:
    image: confluentinc/cp-kafka-connect-base:7.2.2
    build:
      context: ./mongodb-kafka-connect
    ports:
      - "8084:8083"
    hostname: metrics-sink-connect
    container_name: metrics-sink-connect
    depends_on:
      - zookeeper
      - broker
    networks:
      - sync-network
    volumes:
     - kafka-sink-config:/kafka/config
    environment:
      KAFKA_JMX_PORT: 35000
      KAFKA_JMX_HOSTNAME: localhost
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: metrics-sink-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster-group
      CONNECT_CONFIG_STORAGE_TOPIC: metrics_sink_connect_configs
      CONNECT_OFFSET_STORAGE_TOPIC: metrics_sink_connect_offsets
      CONNECT_STATUS_STORAGE_TOPIC: metrics_sink_connect_status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_METADATA_MAX_AGE_MS: 180000
      CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

networks:
  sync-network:
    driver: bridge

volumes:
  kafka-sink-config:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: ./kafka/sink-config

  kafka-src-config:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: ./kafka/src-config

Could someone help figure what I could be possibly missing in the configuration?

Best regards,
Paulo

Hi @Paulo_Henrique_Favero_Pereira

First of all thanks for your detailed information around your question / challenge. Based on the things I know, I can tell you right away that there doesn’t seem to be a “straight-forward one sentence solution” :slight_smile: However I try to highlight a few things that caught my eye and point you to some potential workarounds:

  1. From what I can tell you are using the debezium 2.0 mongo source connector. Under the covers this connector uses mongodb’s changestreams feature as well. The problem is that debezium changed the actual CDC event payload format which was in fact a “breaking change” when you’d compare it with the CDC payload format used until version 1.7 of the connector.

  2. That being said the MongoDB sink connector is currently not prepared to properly deal with this new event payload format from Debezium, neither the official connector from MongoDB, nor my community sink connector which was integrated into the official at some point in the past (back then feature parity). Even if it doesn’t fix the issue, I’d highly recommend you switch to the official MongoDB connector in your docker file instead of using my community sink connector - what’s even more strange to me is the fact that if you want to rely on my community version, you shouldn’t use tag 1.2.0 which points to an even older version. The latest version of my community sink was tagged 1.4.0.

Coming back to the actual problem and the breaking change in the event payload format you might have the following options:

a) you could try to move away from DBZ source and maybe find a way to get your use case working based on the official source connector - I can’t tell if that will work because I don’t know enough details about your use case / requirements. It could be that you need tombstones events which depending on the capture mode aren’t supported in the official mongo source if I’m not mistaken.

b) if you want to stick to debezium source connector, you might get away with using version 1.9 which still allows to configure the “legacy oplog” based CDC and which produces the “old” and AFAIK still compatible CDC event payloads for the sink connector.

c) if neither a) nor b) work for your case and you want to continue using the DBZ 2.0 source connector I’m afraid you need to take some of the following actions to get this solved:

  • apply an SMT which changes the CDC payload structure of update events accordingly so that the sink connector can work with it
  • or instead use a stream processing job (kstreams or ksqlDB query) to do the same that an SMT would do otherwise

Anyway, I think it’s good that you reported this issue and raised awareness. Since it’s not trivial to work around this problem I hope that someone will update the MongoDB sink connector’s CDC handler for Debezium MongoDB so that it is capable to process the new event payload format.

I hope this helps you. Feel free to comment or ask again if anything is unclear.

THX!

Hi @hpgrahsl,

Thanks for your swift reply.

Later on, I found this issue that states a similar problem regarding update operations with the Debezium CDC Handler. Unfortunately, it does not seem to be receiving proper attention. Would be nice to have documented, in the MongoDB sink connector, a version compatibility table between MongoDB, Debezium, or any other CDC.

I tried to use your lib when I exhausted all other options and I didn’t notice that I was using version 1.2.0 :slight_smile:.

Moving on to the presented options…

I tried option “b” before but I wasn’t aware of the “legacy oplog” config so it didn’t work. I managed to get it working using the MongoDB Source Connector instead of the Debezium Connector as you mentioned in option “a” and I even got a step forward:

In my use case, I have local DBS with capped collections in multiple clients. I want to synchronize all the data into a global database. The global DB does not have any capped collection. What was happening was that the source connector was generating “delete” events when the capped collection was full.

I checked out the MongoDB Kafka Connector Repository and I modified the code to create my custom connector for capped collections. I created a sink and a CDC Change Stream Connector that do not process delete operations. I don’t know if there were any other ways to solve this but it’s working smoothly as it should be :smiley:.

Thanks for your input. It helped a lot to expand the horizon of possibilities. I didn’t know that it was possible to solve my issue in the way you mentioned in option “c”.

I hope that the MongoDB team solves the issue regarding the sink connector soon.

Thanks.