This usage example demonstrates how to configure a pipeline to customize the data that your MongoDB Kafka source connector consumes. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data.
MongoDB notifies the connector of data changes that match your aggregation pipeline on a change stream. A change stream is a sequence of events that describe data changes a client made to a MongoDB deployment in real-time. For more information, see the MongoDB server manual entry on Change Streams.
Example
Suppose you are coordinating an event and want to collect names and arrival times of each guest at a specific event. Whenever a guest checks into the event, an application inserts a new document that contains the following details:
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
You can define your connector pipeline setting to instruct the change
stream to filter the change event information as follows:
Create change events for insert operations and omit events for all other types of operations.
Create change events only for documents that match the
fullDocument.eventIdvalue "321" and omit all other documents.Omit the
_idandeventIdfields from thefullDocumentobject using a projection.
To apply these transformations, assign the following aggregation pipeline
to your pipeline setting:
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
Important
Make sure that the results of the pipeline contain the top-level
_id and ns fields of the payload object. MongoDB uses
id as the value of the resume token, and ns to generate the
Kafka output topic name.
When the application inserts the sample document, your configured connector publishes the following record to your Kafka topic:
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
For more information on managing change streams with the source connector, see the connector documentation on Change Streams.