Issues with Dynamic Routing to MongoDB Collections using Kafka Connector Sink


I’m integrating Kafka with MongoDB using the Kafka-connector sink. My objective is to dynamically route messages to specific MongoDB collections based on their Kafka message keys.

Here’s an example Kafka message key:

  "database": "weather_db",
  "collection": "random_col"

I intend to use this key to dynamically determine the target MongoDB database and collection for each Kafka message.

Below is my current connector configuration:

  "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
  "namespace.mapper.key.collection.field": "$collection",
  "tasks.max": "1",
  "topics": "processed_weather_topic",
  "namespace.mapper.key.database.field": "$database",
  "namespace.mapper": "com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper",
  "key.converter.schemas.enable": "false",
  "database": "local",
  "connection.uri": "mongodb://database:27017",
  "value.converter.schemas.enable": "false",
  "name": "mongo-sink",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter"

My expectation is that the namespace.mapper.key.database.field property should route messages to the weather_db database, but it’s currently routing to the default local database.


  1. I also tried using $.database in the configuration.
  2. Excluding "value.converter.schemas.enable": "false" causes the task to fail with a lengthy error message starting with: org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded....

Any guidance or insights on configuring the connector correctly would be greatly appreciated. Thank you!

The configuration I shared is indeed functional. Upon closer examination, I realized the discrepancy was due to the use of single quotes in the message key, rather than the standard double quotes. Consequently, the key I presented in my post (“database”) did not match the one I was testing with (‘database’).

