Mongo-kafka source connector not shipping data to kafka topic

Am using mongodb kafka source connector v1.6… kafka connect is running in distributed mode The problem is message from mongo db is not published to respective kafka topic…
Am using topic.namespace.map config… in logs also I see no error… below is the config file

{
"name":"mongo-DB",
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max":"1",
"connection.uri":"",
"database":"DB",
"copy.existing":"true",
"copy.existing.namespace.regex":"DB.coll1$|DB.coll2$|DB.coll3$|DB.coll4",
"topic.namespace.map":"{\"DB.coll1\\\" : \"topic1\",\"DB.coll2\\\" : \"topic2\",\"DB.coll3\\\" : \"topic3\",\"DB.coll4\\\" : \"topic4\"}",
"poll.max.batch.size":"1000",
"poll.await.time.ms":"5000",
"pipeline":"[{\"$match\":{\"ns.coll\": {\"$regex\": \"\/^(DB.coll1|DB.coll2|DB.coll3|DB.coll4)$\/\"}}}]",
"batch.size":"1",
"change.stream.full.document":"updateLookup",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"publish.full.document.only":"true"
} 

logs:

INFO Opened connection [connectionId{localValue:7, serverValue:283461}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:14,616] INFO Opened connection [connectionId{localValue:8, serverValue:283462}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:16,021] INFO Copying existing data on the following namespaces: [ecaf-staging.augmentPlanRelationship, ecaf-staging.augmentPlan, ecaf-staging.device, ecaf-staging.location] (com.mongodb.kafka.connect.source.MongoCopyDataManager:104)
[2021-12-05 10:59:16,035] INFO Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203)
[2021-12-05 10:59:16,036] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:225)
[2021-12-05 10:59:16,386] INFO Opened connection [connectionId{localValue:9, serverValue:283463}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:16,394] INFO Opened connection [connectionId{localValue:10, serverValue:283464}] to DB (org.mongodb.driver.connection:71)
[2021-12-05 10:59:24,042] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 10:59:31,037] INFO Shutting down executors (com.mongodb.kafka.connect.source.MongoSourceTask:604)
[2021-12-05 10:59:31,037] INFO Finished copying existing data from the collection(s). (com.mongodb.kafka.connect.source.MongoSourceTask:611)
[2021-12-05 10:59:31,038] INFO Watching for database changes on 'ecaf-staging' (com.mongodb.kafka.connect.source.MongoSourceTask:677)
[2021-12-05 10:59:31,066] INFO Resuming the change stream after the previous offset: {"_data": "8261AC9B83000023282B0229296E04"} (com.mongodb.kafka.connect.source.MongoSourceTask:415)
[2021-12-05 10:59:34,043] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 10:59:44,044] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 10:59:54,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:04,045] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:14,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:24,053] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:34,054] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-05 11:00:44,055] INFO WorkerSourceTask{id=mongo-ecaf-staging-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)

Did you find any solution? I am having very similar issue. I also see messages in the logs like:
INFO Opened connection [connectionId{localValue:21, serverValue:3339}] to
myserver.host.name:1025 (org.mongodb.driver.connection:71)
Copying existing data on the following namespaces: [myDb.myCollection]
followed a short time later by:
Finished copying existing data from the collection(s).

Yet, no topic is created in Kafka / no messages. Any suggestions/thoughts?

I think there are just some small issues with the connector configuration of yours.
Give this a try which should hopefully work at your end too.

{
    "name": "mdbsrc",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "tasks.max": "1",
        "connection.uri": "mongodb://mongodb:27017",
        "database": "DB",
        "copy.existing": "true",
        "copy.existing.namespace.regex": "DB.coll[1-4]$",
        "topic.namespace.map": "{\"DB.coll1\" : \"topic1\",\"DB.coll2\" : \"topic2\",\"DB.coll3\" : \"topic3\",\"DB.coll4\" : \"topic4\"}",
        "pipeline": "[   { $match: { \"ns.coll\": { $regex: /^coll[1-4]$/ } } } ]",
        "poll.max.batch.size": "1000",
        "poll.await.time.ms": "5000",
        "batch.size": "1",
        "change.stream.full.document": "updateLookup",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "publish.full.document.only": "true"
    }
}

Most important difference to your example is that if you use $match for ns.coll this refers to the collection name only NOT the combination of db+coll name. If you want to also match db name you have to add a match against ns.db as well. Also your topic mapping config contains additional \\s which would not match and thus lead to default topic namings on kafka side. Also I simplified the regexp a bit because I’m lazy with typing :wink:

Solution to my specific issue was, to create the topic manually. I was assuming it would auto-create but, well… Anyway, once I created the topic, my connector worked.

I put in the following lines and topic auto-created when data flows in:

topic.creation.enable: “true”
topic.creation.default.replication.factor: “-1”
topic.creation.default.partitions: “-1”

Oh - I will have to try that! Thanks for the info.