How can I sync few tables from MySQL DB to MongoDB?

Hello all,

Is it possible to sync my local MySQL database (only few tables) with MongoDB atlas? Any record being inserted into MySQL should also get populated into MongoDB too.

What are the options I have in this scenario?

Hi @Abhi ,

You can use a kafka connector to read data from mysql and write to MongoDB using its own kafka connector.

Thanks
Pavel

1 Like

@Pavel_Duchovny thank you for providing the information. Is there any documentation or article I can go through for detailed steps? I’m bit new to mongodb and I’m not aware for Kafka as well.

In my mysql dB, I just need one table to sync with mongodb that’s my use-case.

I didn’t find a single article to cover it all…

But if you define mysql CDC as a source:

And mongoDB as a sink listening to the same topics it should work as data is flowing in…

1 Like

(post withdrawn by author, will be automatically deleted in 24 hours unless flagged)

@Pavel_Duchovny Thanks for the resources, I was able to setup most of stuff but I am facing a issue with data type, my mongodb sink is not able to convert the response of mysql cdc?

Here’s my source connector config command,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''{
  "name": "source_mysql_connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "host.docker.internal",  
    "database.port": "3306",
    "database.user": "abhijith",
    "database.password": "$apr1$o7RbW.xt$8.GZtOoAhXvRqyYGvrPIY1",
    "database.server.id": "8111999",  
    "database.server.name": "db_source",  
    "database.include.list": "example",  
    "database.history.kafka.bootstrap.servers": "broker:29092",  
    "database.history.kafka.topic": "schema-changes.example",
    "database.allowPublicKeyRetrieval":"true",
  }
}'''

Here’s my sink connector request,

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '''{
  "name": "sink_mongodb_connector",  
  "config": {  
      "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max":"1",
      "topics":"db_source.example.employees",
      "connection.uri":"mongodb://abhi:$apr1$o7RbW.xt$8.GZtOoAhXvRqyYGvrPIY1@172.17.0.1:27017/",
      "database":"example",
      "collection":"employees",
      "key.converter":"org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false"
  }
}'''

Right after I connect the mongodb sink connector, I get this error, I understood that error is due to mongodb sink connector not able to understand the output of mysql source connector. But I don’t know how to fix it?

I tried adding this settings in mysql source connector but didn’t failed,

"transforms": "unwrap"


connect            | [2021-06-21 02:02:40,083] INFO Kafka version: 6.2.0-ccs (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-06-21 02:02:40,083] INFO Kafka commitId: 1a5755cf9401c84f (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-06-21 02:02:40,083] INFO Kafka startTimeMs: 1624240960083 (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-06-21 02:02:40,090] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-sink_mongodb_connector-0 created for client_id=connector-consumer-sink_mongodb_connector-0 client_type=CONSUMER session= cluster=pT-3hoQ9Qfi52n58kayl-Q group=connect-sink_mongodb_connector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-06-21 02:02:40,093] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink_mongodb_connector-0] Cluster ID: pT-3hoQ9Qfi52n58kayl-Q (org.apache.kafka.clients.Metadata)
connect            | [2021-06-21 02:02:40,096] ERROR WorkerSinkTask{id=sink_mongodb_connector-0} Error converting message key in topic 'db_source.example.employees' partition 0 at offset 0 and timestamp 1624240077642: Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
connect            | 	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
connect            | 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            | 	at java.base/java.lang.Thread.run(Thread.java:829)
connect            | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
connect            |  at [Source: (byte[])"Struct{id=2}"; line: 1, column: 8]
connect            | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
connect            |  at [Source: (byte[])"Struct{id=2}"; line: 1, column: 8]
connect            | 	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
connect            | 	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3560)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2655)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:857)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
connect            | 	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
connect            | 	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
connect            | 	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
connect            | 	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
connect            | 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            | 	at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-06-21 02:02:40,098] ERROR WorkerSinkTask{id=sink_mongodb_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect            | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            | 	at java.base/java.lang.Thread.run(Thread.java:829)
connect            | Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
connect            | 	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
connect            | 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect            | 	... 13 more
connect            | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
connect            |  at [Source: (byte[])"Struct{id=2}"; line: 1, column: 8]
connect            | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
connect            |  at [Source: (byte[])"Struct{id=2}"; line: 1, column: 8]
connect            | 	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
connect            | 	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3560)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2655)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:857)
connect            | 	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
connect            | 	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
connect            | 	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
connect            | 	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
connect            | 	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
connect            | 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            | 	at java.base/java.lang.Thread.run(Thread.java:829)

This is my MySQL source connector schema value, I got this from confluent control center,

{
  "connect.name": "db_source.example.employees.Envelope",
  "fields": [
    {
      "default": null,
      "name": "before",
      "type": [
        "null",
        {
          "connect.name": "db_source.example.employees.Value",
          "fields": [
            {
              "name": "id",
              "type": "long"
            },
            {
              "name": "name",
              "type": "string"
            },
            {
              "name": "team",
              "type": "string"
            },
            {
              "name": "birthday",
              "type": {
                "connect.name": "io.debezium.time.Date",
                "connect.version": 1,
                "type": "int"
              }
            }
          ],
          "name": "Value",
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "after",
      "type": [
        "null",
        "Value"
      ]
    },
    {
      "name": "source",
      "type": {
        "connect.name": "io.debezium.connector.mysql.Source",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "default": "false",
            "name": "snapshot",
            "type": [
              {
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum",
                "connect.parameters": {
                  "allowed": "true,last,false"
                },
                "connect.version": 1,
                "type": "string"
              },
              "null"
            ]
          },
          {
            "name": "db",
            "type": "string"
          },
          {
            "default": null,
            "name": "sequence",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "default": null,
            "name": "table",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "server_id",
            "type": "long"
          },
          {
            "default": null,
            "name": "gtid",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "file",
            "type": "string"
          },
          {
            "name": "pos",
            "type": "long"
          },
          {
            "name": "row",
            "type": "int"
          },
          {
            "default": null,
            "name": "thread",
            "type": [
              "null",
              "long"
            ]
          },
          {
            "default": null,
            "name": "query",
            "type": [
              "null",
              "string"
            ]
          }
        ],
        "name": "Source",
        "namespace": "io.debezium.connector.mysql",
        "type": "record"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "default": null,
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "transaction",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "id",
              "type": "string"
            },
            {
              "name": "total_order",
              "type": "long"
            },
            {
              "name": "data_collection_order",
              "type": "long"
            }
          ],
          "name": "ConnectDefault",
          "namespace": "io.confluent.connect.avro",
          "type": "record"
        }
      ]
    }
  ],
  "name": "Envelope",
  "namespace": "db_source.example.employees",
  "type": "record"
}

I @Abhi

I think the issue is converting data of type byte into one of MongoDB types.

I think you will need to play with converters on each side to make it right

https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/connect/storage/StringConverter.html

Not a kafka expert so maybe someone else can help
Thanks
Pavel

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.