Hello,
I’m integrating Kafka with MongoDB using the Kafka-connector sink. My objective is to dynamically route messages to specific MongoDB collections based on their Kafka message keys.
Here’s an example Kafka message key:
{
"database": "weather_db",
"collection": "random_col"
}
I intend to use this key to dynamically determine the target MongoDB database and collection for each Kafka message.
Below is my current connector configuration:
{
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"namespace.mapper.key.collection.field": "$collection",
"tasks.max": "1",
"topics": "processed_weather_topic",
"namespace.mapper.key.database.field": "$database",
"namespace.mapper": "com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper",
"key.converter.schemas.enable": "false",
"database": "local",
"connection.uri": "mongodb://database:27017",
"value.converter.schemas.enable": "false",
"name": "mongo-sink",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
My expectation is that the namespace.mapper.key.database.field property should route messages to the weather_db database, but it’s currently routing to the default local database.
Notes:
- I also tried using
$.databasein the configuration. - Excluding
"value.converter.schemas.enable": "false"causes the task to fail with a lengthy error message starting with:org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded....
Any guidance or insights on configuring the connector correctly would be greatly appreciated. Thank you!