MongoDB Kafka source connector pipeline for multiple collections isn't working

Hello! We’re trying to get messages from the three collections in one DB via one connector. Pipeline in our config is similar to a documentation:

  "name": "<connector_name>",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "batch.size": "1000",
    "transforms": "dropPrefix",
    "database": "<db_name>",
    "collection": "",
    "copy.existing.pipeline": "[{\"$match\": {\"ns.coll\": {\"$regex\": /^(\"<collection_1>|<collection_2>|<collection_3>\")$/}}}]",
    "pipeline": "[{\"$match\": {\"ns.coll\": {\"$regex\": /^(\"<collection_1>|<collection_2>|<collection_3>\")$/}}}]",
    "key.converter.schemas.enable": "false",
    "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
    "connection.uri": "<connection_uri>",
    "name": "<connector_name>",
    "topic.creation.default.partitions": "3",
    "topic.creation.default.replication.factor": "3",
    "value.converter.schemas.enable": "false",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.replacement": "<topic_name>",
    "transforms.dropPrefix.regex": "(.*)<db_name>(.*)",
    "copy.existing": "true",
    "value.converter": "",
    "key.converter": ""

Connector starts successfully, but that’s all, no messages are coming to the topic. Can anyone tell us what exactly we are doing wrong, please?

Is there anything in the kafka connect log?

if you remove the pipeline, copy.existing.pipeline does it capture events?

also try removing “collection”:"" since you only need to specify the database in your scenario.

Thank You for response! Answering your questions:

  1. No, nothing suspicious, connector successfully connected to the DB, but that’s all.
  2. If removing “pipeline” and “copy.existing.pipeline” parameters connector successfully captures events from specified collection.
  3. Already tried it, nothing changed.

let’s start with the minimum connector config and go from there,

name”: “<connector_name>”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector”,
“database”: “<db_name>”,
“connection.uri”: “<connection_uri>”,
“name”: “<connector_name>”,
“value.converter”: “”,
“key.converter”: “”

see if that generates events.

After lot of tests the connector with following configuration is working:

      "name" : "<connector_name>",
      "config" : {
        "batch.size" : "1000",
        "connection.uri" : "<connection.uri>",
        "connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
        "copy.existing" : "true",
        "database" : "<db_name>",
        "key.converter" : "",
        "key.converter.schemas.enable" : "false",
        "name" : "<connector_name>",
        "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
        "pipeline" : "[   { $match: { \"ns.coll\": { \"$in\": [\"<collection_1>\", \"<collection_2>\", \"<collection_3>\" ] } } } ]",
        "transforms" : "dropPrefix",
        "transforms.dropPrefix.regex" : "(.*)<db_name>(.*)",
        "transforms.dropPrefix.replacement" : "<topic_name>",
        "transforms.dropPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
        "value.converter" : "",
        "value.converter.schemas.enable" : "false"

The only major difference is a different pipeline format. So, there’s another question - what can be wrong with the pipeline version from the documentation or there’s another root cause of this issue?

Small update. With the configuration from above after restarting Kafka Connect node because of Out Of Memory issue some kind of topic re-init happens, all historical messages has been re-uploaded to the topic. What could have caused this?
Thank you.

you have copy.existing set to true so that will copy all the existing data in the collection before opening the change stream and processing the current events.

Is there any solution to bypass messages duplication and avoid messages lose, except of using 2 connectors (with copy.existing:true and without)? We need all existing data, but don’t need to duplicate it, because there’re lot of such data and reuploading causes issues.


i am facing similar thing on source connector config . my config is to collect data from multiple databases and collections from same mongodb host and publish to same topic .

below is config i am using , but getting error

< {
“name” : “mongo-source”,
“config” : {
“batch.size” : “1000”,
“connection.uri” : “mongodb://:@*********************:1025/?ssl”,
“connector.class” : “com.mongodb.kafka.connect.MongoSourceConnector”,
“key.converter”: “”,
“value.converter”: “”,
“pipeline”: “[ {"$match": {$or: [ {"ns.db": "uat_move5app", "ns.coll": "AccessToken"}, {"ns.db": "uat_move5app", "ns.coll": "Account"}, {"ns.db": "uat_move5challenge", "ns.coll": "Achievement"}, {"ns.db": "uat_move5health", "ns.coll":"AppleRing"}, {"ns.db": "uat_move5app", "ns.coll": "Application"}, {"ns.db": "uat_move5app", "ns.coll": "AuditLog"}, {"ns.db": "uat_move5challenge", "ns.coll": "Badge"}, {"ns.db": "uat_move5challenge", "ns.coll": "Challenge"}, {"ns.db": "uat_move5challenge", "ns.coll": "Code"}, {"ns.db": "uat_move5app", "ns.coll": "Country"}, {"ns.db": "uat_move5challenge", "ns.coll": "Goal"}, {"ns.db": "uat_move5challenge", "ns.coll": "GoalReward"}, {"ns.db": "uat_move5tracker", "ns.coll": "HealthNotification"}, {"ns.db": "uat_move5health", "ns.coll": "HealthSummary"}, {"ns.db": "uat_move5tracker", "ns.coll": "HealthTracker"}, {"ns.db": "uat_move5app", "ns.coll": "Installation"}, {"ns.db": "uat_move5cas", "ns.coll": "HPMember"}, {"ns.db": "uat_move5cas", "ns.coll": "MoveKey"}, {"ns.db": "uat_move5app", "ns.coll": "Muser"}, {"ns.db": "uat_move5challenge", "ns.coll": "Participation"}, {"ns.db": "uat_move5challenge", "ns.coll": "Program"}, {"ns.db": "uat_move5notification", "ns.coll": "PushNotification"}, {"ns.db": "uat_move5notification", "ns.coll": "PushResponse"}, {"ns.db": "uat_move5notification", "ns.coll": "PushSubscription"}, {"ns.db": "uat_move5queue", "ns.coll": "QueueError"}, {"ns.db": "uat_move5challenge", "ns.coll": "Reward"}, {"ns.db": "uat_move5app", "ns.coll": "RoleMapping"}, {"ns.db": "uat_move5app", "ns.coll": "Role"}, {"ns.db": "uat_move5queue","ns.coll": "Task"}, {"ns.db": "uat_move5queue", "ns.coll": "TaskConfig"}, {"ns.db": "uat_move5challenge", "ns.coll": "UserBadge"}, {"ns.db": "uat_move5challenge", "ns.coll": "UserCode"}, {"ns.db": "uat_move5challenge", "ns.coll":"UserGoal"}, {"ns.db": "uat_move5challenge", "ns.coll": "UserReward"}, {"ns.db": "uat_move5app", "ns.coll": "UserState"}, {"ns.db": "uat_move5message", "ns.coll": "DestinationMapping"}, {"ns.db": "uat_move5message", "ns.coll": "FollowUpMapping"}, {"ns.db": "uat_move5health-score", "ns.coll": "HealthProfile"}, {"ns.db": "uat_move5health-score", "ns.coll": "HealthScore"}, {"ns.db": "uat_move5health-score", "ns.coll": "HealthScoreDelta"}, {"ns.db": "uat_move5health-score", "ns.coll": "ProviderAccount"}, {"ns.db": "uat_move5health-score", "ns.coll": "SurveyQuestion"}, {"ns.db": "uat_move5message", "ns.coll": "SystemMessage"}, {"ns.db": "uat_move5message", "ns.coll": "UserMessage"},{"ns.db": "uat_move5health-score", "ns.coll": "UserSurvey"}, {"ns.db": "perf_move5edl", "ns.coll": "HealthSummary"}, {"ns.db": "perf_move5edl", "ns.coll": "AppleRing"}, {"ns.db": "perf_move5edl", "ns.coll": "UserReward"}, {"ns.db": "perf_move5edl", "ns.coll": "UserGoal"}, {"ns.db": "perf_move5edl", "ns.coll": "UserState"}, {"ns.db": "perf_move5edl", "ns.coll": "Participation"}, {"ns.db": "perf_move5edl", "ns.coll": "HealthScore"}, {"ns.db": "perf_move5edl", "ns.coll": "Muser"}, {"ns.db": "perf_move5edl", "ns.coll": "Account"} ] } } ]”,
“topic.prefix”: “SG_uat_move5app.Installation”

curl -X PUT -H “Content-Type: application/json” --data @./test.json http://localhost:8083/connectors/MongoSourceConnectorV1/config
{“error_code”:500,“message”:“Cannot deserialize value of type java.lang.String from Object value (token JsonToken.START_OBJECT)\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 53] (through reference chain: java.util.LinkedHashMap["config"])”}