AGorshkov
(Alexander Gorshkov)
August 25, 2021, 3:02pm
#1
Hello! We’re trying to get messages from the three collections in one DB via one connector. Pipeline in our config is similar to a documentation:
{
"name": "<connector_name>",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"batch.size": "1000",
"transforms": "dropPrefix",
"database": "<db_name>",
"collection": "",
"copy.existing.pipeline": "[{\"$match\": {\"ns.coll\": {\"$regex\": /^(\"<collection_1>|<collection_2>|<collection_3>\")$/}}}]",
"pipeline": "[{\"$match\": {\"ns.coll\": {\"$regex\": /^(\"<collection_1>|<collection_2>|<collection_3>\")$/}}}]",
"key.converter.schemas.enable": "false",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"connection.uri": "<connection_uri>",
"name": "<connector_name>",
"topic.creation.default.partitions": "3",
"topic.creation.default.replication.factor": "3",
"value.converter.schemas.enable": "false",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.replacement": "<topic_name>",
"transforms.dropPrefix.regex": "(.*)<db_name>(.*)",
"copy.existing": "true",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Connector starts successfully, but that’s all, no messages are coming to the topic. Can anyone tell us what exactly we are doing wrong, please?
Is there anything in the kafka connect log?
if you remove the pipeline, copy.existing.pipeline does it capture events?
also try removing “collection”:"" since you only need to specify the database in your scenario.
AGorshkov
(Alexander Gorshkov)
August 25, 2021, 5:37pm
#3
Thank You for response! Answering your questions:
No, nothing suspicious, connector successfully connected to the DB, but that’s all.
If removing “pipeline” and “copy.existing.pipeline” parameters connector successfully captures events from specified collection.
Already tried it, nothing changed.
let’s start with the minimum connector config and go from there,
name”: “<connector_name>”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector”,
“database”: “<db_name>”,
“connection.uri”: “<connection_uri>”,
“name”: “<connector_name>”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}
see if that generates events.
AGorshkov
(Alexander Gorshkov)
August 26, 2021, 8:57am
#5
After lot of tests the connector with following configuration is working:
{
"name" : "<connector_name>",
"config" : {
"batch.size" : "1000",
"connection.uri" : "<connection.uri>",
"connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
"copy.existing" : "true",
"database" : "<db_name>",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"name" : "<connector_name>",
"output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"pipeline" : "[ { $match: { \"ns.coll\": { \"$in\": [\"<collection_1>\", \"<collection_2>\", \"<collection_3>\" ] } } } ]",
"transforms" : "dropPrefix",
"transforms.dropPrefix.regex" : "(.*)<db_name>(.*)",
"transforms.dropPrefix.replacement" : "<topic_name>",
"transforms.dropPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable" : "false"
}
}
The only major difference is a different pipeline format. So, there’s another question - what can be wrong with the pipeline version from the documentation or there’s another root cause of this issue?
AGorshkov
(Alexander Gorshkov)
September 1, 2021, 11:22am
#6
Small update. With the configuration from above after restarting Kafka Connect node because of Out Of Memory issue some kind of topic re-init happens, all historical messages has been re-uploaded to the topic. What could have caused this?
Thank you.
you have copy.existing set to true so that will copy all the existing data in the collection before opening the change stream and processing the current events.
AGorshkov
(Alexander Gorshkov)
September 22, 2021, 2:55pm
#8
Is there any solution to bypass messages duplication and avoid messages lose, except of using 2 connectors (with copy.existing:true and without)? We need all existing data, but don’t need to duplicate it, because there’re lot of such data and reuploading causes issues.
2 Likes