Docs Menu

Docs HomeMongoDB Kafka Connector

Listen for Changes on Multiple Sources

This usage example demonstrates how to configure a MongoDB Kafka source connector to listen for change events on multiple MongoDB collections, and publish them to a Kafka topic.

If you need your connector to listen for change events on a more particular set of databases and collections, you can use a pipeline. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. See the next section for examples of how to configure your connector pipeline setting to match multiple database and collection names using a regular expression.

Note

The database and collection configuration settings also affect which databases and collections on which the connector listens for change events. To learn more about these settings, see the MongoDB Source Connection Properties guide.

The following examples show you how to use an aggregation pipeline to select specific database or collection names on which to listen for change events.

You can define an aggregation pipeline to select only change events on multiple databases by specifying the following in the pipeline setting:

  • A $match aggregation operator

  • The ns.db, field which identifies the database part of the namespace

  • The $regex operator and a regular expression that matches the database names

The following sample configuration shows how you can set your source connector to listen for change events on the sandbox and firewall databases:

pipeline=[{"$match": {"ns.db": {"$regex": "/^(sandbox|firewall)$/"}}}]

You can define an aggregation pipeline to ignore change events on multiple collections by specifying the following in the pipeline setting:

  • A $match aggregation operator

  • The ns.coll field, which identifies the collection part of the namespace

  • The $regex operator and a regular expression that matches the collection names

  • The $not operator which instructs the enclosing $regex operator to match everything the regular expression does not match

The following sample configuration shows how you can set your source connector to filter out change events that originate from all collections named "hyperspace" in any database:

pipeline=[{"$match": {"ns.coll": {"$regex": {"$not": "/^hyperspace$/"}}}}]
←  Customize a Pipeline to Filter Change EventsTopic Naming →