Mongodb connector to Kafka

i am facing some error on source connector config . my config is to collect data from multiple databases and collections from same mongodb host and publish to same topic .

below is config i am using , but getting error

< {
“name” : “mongo-source”,
“config” : {
“batch.size” : “1000”,
“connection.uri” : “mongodb://: @*********************:1025/?ssl”,
“connector.class” : “com.mongodb.kafka.connect.MongoSourceConnector”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“pipeline”: “[ {“$match”: {$or: [ {“ns.db”: “uat_move5app”, “ns.coll”: “AccessToken”}, {“ns.db”: “uat_move5app”, “ns.coll”: “Account”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Achievement”}, {“ns.db”: “uat_move5health”, “ns.coll”:“AppleRing”}, {“ns.db”: “uat_move5app”, “ns.coll”: “Application”}, {“ns.db”: “uat_move5app”, “ns.coll”: “AuditLog”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Badge”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Challenge”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Code”}, {“ns.db”: “uat_move5app”, “ns.coll”: “Country”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Goal”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “GoalReward”}, {“ns.db”: “uat_move5tracker”, “ns.coll”: “HealthNotification”}, {“ns.db”: “uat_move5health”, “ns.coll”: “HealthSummary”}, {“ns.db”: “uat_move5tracker”, “ns.coll”: “HealthTracker”}, {“ns.db”: “uat_move5app”, “ns.coll”: “Installation”}, {“ns.db”: “uat_move5cas”, “ns.coll”: “HPMember”}, {“ns.db”: “uat_move5cas”, “ns.coll”: “MoveKey”}, {“ns.db”: “uat_move5app”, “ns.coll”: “Muser”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Participation”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Program”}, {“ns.db”: “uat_move5notification”, “ns.coll”: “PushNotification”}, {“ns.db”: “uat_move5notification”, “ns.coll”: “PushResponse”}, {“ns.db”: “uat_move5notification”, “ns.coll”: “PushSubscription”}, {“ns.db”: “uat_move5queue”, “ns.coll”: “QueueError”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “Reward”}, {“ns.db”: “uat_move5app”, “ns.coll”: “RoleMapping”}, {“ns.db”: “uat_move5app”, “ns.coll”: “Role”}, {“ns.db”: “uat_move5queue”,“ns.coll”: “Task”}, {“ns.db”: “uat_move5queue”, “ns.coll”: “TaskConfig”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “UserBadge”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “UserCode”}, {“ns.db”: “uat_move5challenge”, “ns.coll”:“UserGoal”}, {“ns.db”: “uat_move5challenge”, “ns.coll”: “UserReward”}, {“ns.db”: “uat_move5app”, “ns.coll”: “UserState”}, {“ns.db”: “uat_move5message”, “ns.coll”: “DestinationMapping”}, {“ns.db”: “uat_move5message”, “ns.coll”: “FollowUpMapping”}, {“ns.db”: “uat_move5health-score”, “ns.coll”: “HealthProfile”}, {“ns.db”: “uat_move5health-score”, “ns.coll”: “HealthScore”}, {“ns.db”: “uat_move5health-score”, “ns.coll”: “HealthScoreDelta”}, {“ns.db”: “uat_move5health-score”, “ns.coll”: “ProviderAccount”}, {“ns.db”: “uat_move5health-score”, “ns.coll”: “SurveyQuestion”}, {“ns.db”: “uat_move5message”, “ns.coll”: “SystemMessage”}, {“ns.db”: “uat_move5message”, “ns.coll”: “UserMessage”},{“ns.db”: “uat_move5health-score”, “ns.coll”: “UserSurvey”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “HealthSummary”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “AppleRing”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “UserReward”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “UserGoal”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “UserState”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “Participation”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “HealthScore”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “Muser”}, {“ns.db”: “perf_move5edl”, “ns.coll”: “Account”} ] } } ]”,
“topic.prefix”: “SG_uat_move5app.Installation”
}
}

Error:
curl -X PUT -H “Content-Type: application/json” --data @./test.json http://localhost:8083/connectors/MongoSourceConnectorV1/config
{“error_code”:500,“message”:“Cannot deserialize value of type java.lang.String from Object value (token JsonToken.START_OBJECT)\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 53] (through reference chain: java.util.LinkedHashMap[“config”])”}

I faced a similar error here:

Hi @vasireddy_prasanth,

Seems like Kafka is throwing an error when trying to read the configuraiton.

Please escape quotes in the pipeline config and that should help.

Ross

Hi Ross, Thanks for info
may i know which quotes you want me to escape

Hi @vasireddy_prasanth,

Its the pipeline configuration:

“pipeline”:  "[ {\"$match\": {$or: [ {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"AccessToken\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"Account\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Achievement\"}, {\"ns.db\": \"uat_move5health\", \"ns.coll\":\"AppleRing\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"Application\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"AuditLog\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Badge\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Challenge\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Code\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"Country\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Goal\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"GoalReward\"}, {\"ns.db\": \"uat_move5tracker\", \"ns.coll\": \"HealthNotification\"}, {\"ns.db\": \"uat_move5health\", \"ns.coll\": \"HealthSummary\"}, {\"ns.db\": \"uat_move5tracker\", \"ns.coll\": \"HealthTracker\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"Installation\"}, {\"ns.db\": \"uat_move5cas\", \"ns.coll\": \"HPMember\"}, {\"ns.db\": \"uat_move5cas\", \"ns.coll\": \"MoveKey\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"Muser\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Participation\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Program\"}, {\"ns.db\": \"uat_move5notification\", \"ns.coll\": \"PushNotification\"}, {\"ns.db\": \"uat_move5notification\", \"ns.coll\": \"PushResponse\"}, {\"ns.db\": \"uat_move5notification\", \"ns.coll\": \"PushSubscription\"}, {\"ns.db\": \"uat_move5queue\", \"ns.coll\": \"QueueError\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"Reward\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"RoleMapping\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"Role\"}, {\"ns.db\": \"uat_move5queue\",\"ns.coll\": \"Task\"}, {\"ns.db\": \"uat_move5queue\", \"ns.coll\": \"TaskConfig\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"UserBadge\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"UserCode\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\":\"UserGoal\"}, {\"ns.db\": \"uat_move5challenge\", \"ns.coll\": \"UserReward\"}, {\"ns.db\": \"uat_move5app\", \"ns.coll\": \"UserState\"}, {\"ns.db\": \"uat_move5message\", \"ns.coll\": \"DestinationMapping\"}, {\"ns.db\": \"uat_move5message\", \"ns.coll\": \"FollowUpMapping\"}, {\"ns.db\": \"uat_move5health-score\", \"ns.coll\": \"HealthProfile\"}, {\"ns.db\": \"uat_move5health-score\", \"ns.coll\": \"HealthScore\"}, {\"ns.db\": \"uat_move5health-score\", \"ns.coll\": \"HealthScoreDelta\"}, {\"ns.db\": \"uat_move5health-score\", \"ns.coll\": \"ProviderAccount\"}, {\"ns.db\": \"uat_move5health-score\", \"ns.coll\": \"SurveyQuestion\"}, {\"ns.db\": \"uat_move5message\", \"ns.coll\": \"SystemMessage\"}, {\"ns.db\": \"uat_move5message\", \"ns.coll\": \"UserMessage\"},{\"ns.db\": \"uat_move5health-score\", \"ns.coll\": \"UserSurvey\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"HealthSummary\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"AppleRing\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"UserReward\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"UserGoal\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"UserState\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"Participation\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"HealthScore\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"Muser\"}, {\"ns.db\": \"perf_move5edl\", \"ns.coll\": \"Account\"} ] } } ]"

Thanks Ross let me check