This usage example demonstrates how to configure a MongoDB Kafka source connector to listen for change events on multiple MongoDB collections, and publish them to a Kafka topic.
If you need your connector to listen for change events on a more particular set of databases and collections, you can use a pipeline. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. See the next section for examples of how to configure your connector pipeline setting to match multiple database and collection names using a regular expression.
Note
The database and collection configuration settings also affect which databases and collections on which the connector listens for change events. To learn more about these settings, see the MongoDB Source Connection Properties guide.
Examples
The following examples show you how to use an aggregation pipeline to select specific database or collection names on which to listen for change events.
Include Change Events from Multiple Databases
You can define an aggregation pipeline to select only change events on multiple databases by specifying the following in the pipeline setting:
A
$matchaggregation operatorThe
ns.db, field which identifies the database part of the namespaceThe
$regexoperator and a regular expression that matches the database names
The following sample configuration shows how you can set your source connector to listen for change events on the sandbox and firewall databases:
pipeline=[{"$match": {"ns.db": {"$regex": "/^(sandbox|firewall)$/"}}}]
Exclude Change Events from Multiple Collections
You can define an aggregation pipeline to ignore change events on multiple collections by specifying the following in the pipeline setting:
A
$matchaggregation operatorThe
ns.collfield, which identifies the collection part of the namespaceThe
$regexoperator and a regular expression that matches the collection namesThe
$notoperator which instructs the enclosing$regexoperator to match everything the regular expression does not match
The following sample configuration shows how you can set your source connector to filter out change events that originate from all collections named "hyperspace" in any database:
pipeline=[{"$match": {"ns.coll": {"$regex": {"$not": "/^hyperspace$/"}}}}]
Additional Information
Regular expression syntax using the Patterns class