MongoDB as sink connector not capturing data as expected - kafka?

I am currently using MySQL database as source connector using this config below, I want to monitor changes to a database and send it to mongoDB,

Here’s my source connector config,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''{
  "name": "source_mysql_connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "host.docker.internal",  
    "database.port": "3306",
    "database.user": "test",
    "database.password": "$apr1$o7RbW.GvrPIY1",
    "database.server.id": "8111999",  
    "database.server.name": "db_source",  
    "database.include.list": "example",  
    "database.history.kafka.bootstrap.servers": "broker:29092",  
    "database.history.kafka.topic": "schema-changes.example",
    "database.allowPublicKeyRetrieval":"true",
    "include.schema.changes": "true"
  }
}'''

Here’s my sink connector (mongodb) config,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''{
  "name": "sink_mongodb_connector",  
  "config": {  
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max":"1",
      "topics":"db_source.example.employees",
      "connection.uri":"mongodb://172.17.0.1:27017/example?w=1&journal=true",
      "database":"example",
      "collection":"employees",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}'''

Using this I was able to establish the connection and catch the data changes and store them onto mongodb collection for a table called employees,

But the problem here is when I checked the collections in mongodb the documents were saved like this,

{ "_id" : ObjectId("60d0e6939e00e22f274ccac1"), "before" : null, "after" : { "id" : NumberLong(11), "name" : "Steve Shining", "team" : "DevOps", "birthday" : 11477 }, "source" : { "version" : "1.5.0.Final", "connector" : "mysql", "name" : "db_source", "ts_ms" : NumberLong("1624303251000"), "snapshot" : "false", "db" : "example", "sequence" : null, "table" : "employees", "server_id" : NumberLong(6030811), "gtid" : null, "file" : "mysql-bin.000003", "pos" : NumberLong(5445), "row" : 2, "thread" : null, "query" : null }, "op" : "c", "ts_ms" : NumberLong("1624303251190"), "transaction" : null }

{ "_id" : ObjectId("60d0e6939e00e22f274ccac2"), "before" : null, "after" : { "id" : NumberLong(12), "name" : "John", "team" : "Support", "birthday" : 6270 }, "source" : { "version" : "1.5.0.Final", "connector" : "mysql", "name" : "db_source", "ts_ms" : NumberLong("1624303251000"), "snapshot" : "false", "db" : "example", "sequence" : null, "table" : "employees", "server_id" : NumberLong(6030811), "gtid" : null, "file" : "mysql-bin.000003", "pos" : NumberLong(5445), "row" : 3, "thread" : null, "query" : null }, "op" : "c", "ts_ms" : NumberLong("1624303251190"), "transaction" : null }

But my mysql database looks like this,

mysql> select * from employees;
+----+---------------+-----------+------------+------------+
| id   | name                | team          |  birthday   |
+----+---------------+-----------+------------+------------+
|  1    | Peter Smith     | DevOps     | 2003-07-21  |
| 11    | Steve Shining | DevOps     | 2001-06-04 |
| 12   | John                  | Support    | 1987-03-03  |
+----+---------------+-----------+------------+------------+

I want my collections to look like this,

{ "_id" : ObjectId("60d0e6939e00e22f274ccac2"), "name" : "John", "team" : "Support", "birthday" : "1987-03-03 "}

What am I doing wrong here? Even the delete message is stored in collection like this, it is not able to identify the message and all. How do I fix it? Even the dates are not stored properly?

You might be able to use a post-processor in the sink to allow only those columns like after.name, after.team, etc

Hi @Abhi

When you want the MongoDB Sink Connector to process CDC events as created by Debezium - in your example from a MySQL instance - you have to make sure to configure the sink connector properly.

Read about the options here: https://docs.mongodb.com/kafka-connector/current/kafka-sink-cdc/#change-data-capture-using-debezium

The most important thing for your example is to configure the following sink connector property:

"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.RdbmsHandler"

This should do the trick to insert/update/delete the actual data into the target collection the sink connector writes to.

1 Like