MongoDB as sink connector not capturing data as expected - kafka?

I am currently using MySQL database as source connector using this config below, I want to monitor changes to a database and send it to mongoDB,

Here’s my source connector config,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''{
  "name": "source_mysql_connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "host.docker.internal",  
    "database.port": "3306",
    "database.user": "test",
    "database.password": "$apr1$o7RbW.GvrPIY1",
    "database.server.id": "8111999",  
    "database.server.name": "db_source",  
    "database.include.list": "example",  
    "database.history.kafka.bootstrap.servers": "broker:29092",  
    "database.history.kafka.topic": "schema-changes.example",
    "database.allowPublicKeyRetrieval":"true",
    "include.schema.changes": "true"
  }
}'''

Here’s my sink connector (mongodb) config,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''{
  "name": "sink_mongodb_connector",  
  "config": {  
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max":"1",
      "topics":"db_source.example.employees",
      "connection.uri":"mongodb://172.17.0.1:27017/example?w=1&journal=true",
      "database":"example",
      "collection":"employees",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}'''

Using this I was able to establish the connection and catch the data changes and store them onto mongodb collection for a table called employees,

But the problem here is when I checked the collections in mongodb the documents were saved like this,

{ "_id" : ObjectId("60d0e6939e00e22f274ccac1"), "before" : null, "after" : { "id" : NumberLong(11), "name" : "Steve Shining", "team" : "DevOps", "birthday" : 11477 }, "source" : { "version" : "1.5.0.Final", "connector" : "mysql", "name" : "db_source", "ts_ms" : NumberLong("1624303251000"), "snapshot" : "false", "db" : "example", "sequence" : null, "table" : "employees", "server_id" : NumberLong(6030811), "gtid" : null, "file" : "mysql-bin.000003", "pos" : NumberLong(5445), "row" : 2, "thread" : null, "query" : null }, "op" : "c", "ts_ms" : NumberLong("1624303251190"), "transaction" : null }

{ "_id" : ObjectId("60d0e6939e00e22f274ccac2"), "before" : null, "after" : { "id" : NumberLong(12), "name" : "John", "team" : "Support", "birthday" : 6270 }, "source" : { "version" : "1.5.0.Final", "connector" : "mysql", "name" : "db_source", "ts_ms" : NumberLong("1624303251000"), "snapshot" : "false", "db" : "example", "sequence" : null, "table" : "employees", "server_id" : NumberLong(6030811), "gtid" : null, "file" : "mysql-bin.000003", "pos" : NumberLong(5445), "row" : 3, "thread" : null, "query" : null }, "op" : "c", "ts_ms" : NumberLong("1624303251190"), "transaction" : null }

But my mysql database looks like this,

mysql> select * from employees;
+----+---------------+-----------+------------+------------+
| id   | name                | team          |  birthday   |
+----+---------------+-----------+------------+------------+
|  1    | Peter Smith     | DevOps     | 2003-07-21  |
| 11    | Steve Shining | DevOps     | 2001-06-04 |
| 12   | John                  | Support    | 1987-03-03  |
+----+---------------+-----------+------------+------------+

I want my collections to look like this,

{ "_id" : ObjectId("60d0e6939e00e22f274ccac2"), "name" : "John", "team" : "Support", "birthday" : "1987-03-03 "}

What am I doing wrong here? Even the delete message is stored in collection like this, it is not able to identify the message and all. How do I fix it? Even the dates are not stored properly?

You might be able to use a post-processor in the sink to allow only those columns like after.name, after.team, etc

Hi @Abhi

When you want the MongoDB Sink Connector to process CDC events as created by Debezium - in your example from a MySQL instance - you have to make sure to configure the sink connector properly.

Read about the options here: https://docs.mongodb.com/kafka-connector/current/kafka-sink-cdc/#change-data-capture-using-debezium

The most important thing for your example is to configure the following sink connector property:

"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.RdbmsHandler"

This should do the trick to insert/update/delete the actual data into the target collection the sink connector writes to.

2 Likes

Hi @Abhi I am also doing a similar thing. I followed @hpgrahsl solution and now data is being inserted in the way I want. But there is problem with date field. In MySQL, I have date stored in datetime format and I want the date to get stored as as string literal datetime expression in mongodb but it is getting stored as long timestamp expression. Can you please help.

Thanks

Assuming that you are using Debezium MySQL Source connector, right? If it captures your changes from the mysql table it will convert temporal types accordingly. So if you look into the corresponding kafka topic you should see that the field in question is already stored with a numeric type. You find details about this in the Debezium docs here Debezium connector for MySQL :: Debezium Documentation (make sure to verify which version of Debezium you are running so that you don’t read docs for a different version). So I’m afraid the mongodb sink connector in this case cannot directly give you the temporal type conversion you want to have. You can look into kafka connect transformations (SMTs) and check if you can configure something you need. The TimestampConverter is a good starting point. Your problem will most likely be that the date/time field is a nested field within the kafka records payload and I think this isn’t supported by the TimestampConverter directly. You can combine multiple SMTs. If you cannot do it with the existing pre-built SMTs you might fallback to writing your own custom SMT for that.

Thank you @hpgrahsl. My issue was resolved by using the Debezium MySQL connector’s built-in support for converting MySQL’s temporal types to Kafka Connect’s temporal types. I added these to parameter’s in my mysql connector config and issue was resolved.

“time.precision.mode”: “connect”,
“time.integral.types”: “timestamp,date”

1 Like

If the built-in temporal conversions can help you it’s definitely the easiest and in that case better option. Happy to hear you got it working according to you requirements!