Docs Menu

Explore MongoDB Change Streams

On this page

  • Explore Change Streams
  • Summary
  • Learn More

Follow this tutorial to learn how to create a change stream on a MongoDB collection and observe the change events it creates.

Note
Before You Get Started

Before you start this tutorial, you must complete the steps in the Kafka Connector Tutorial Setup.

1

Create two interactive shell sessions on the tutorial Docker Container, each in a separate window.

This command starts an interactive shell called ChangeStreamShell1
docker run --rm --name ChangeStreamShell1 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
This command starts an interactive shell called ChangeStreamShell2
docker run --rm --name ChangeStreamShell2 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
2

In ChangeStreamShell1, create a Python script to open a change stream using the PyMongo driver.

nano openchangestream.py

Paste the following code into the file and save the changes:

import os
import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nChange Stream is opened on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Run the Python script:

python3 openchangestream.py

The script outputs the following message after it starts successfully:

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
3

In ChangeStreamShell2, connect to MongoDB using mongosh, the MongoDB shell, using the following command:

mongosh "mongodb://mongo1"

After you connect successfully, you should see the following MongoDB shell prompt:

rs0 [direct: primary] test>

At the prompt, type the following commands:

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

After entering the preceding commands, switch to ChangeStreamShell1 to view the change stream output, which should resemble the following:

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"fullDocument": {
"_id": {
"$oid": "62648461d9440c0c72a2202c"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62648461d9440c0c72a2202c"
}
}
}

To stop the script, press Ctrl+C.

By the end of this step, you've successfully triggered and observed a change stream event.

4

You can apply a filter to a change stream by passing it an aggregation pipeline.

In ChangeStreamShell1, create a new Python script to open a filtered change stream using the PyMongo driver.

nano pipeline.py

Paste the following code into the file and save the changes:

import os
import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Run the Python script:

python3 pipeline.py

The script outputs the following message after it starts successfully:

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
5

Return to your ChangeStreamShell2 session which should be connected to MongoDB using mongosh.

At the prompt, type the following commands:

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

As indicated by the script output, the change stream creates a change event because it matches the following pipeline:

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

Try inserting the following documents in in ChangeStreamShell2 to verify the change stream only produces events when the documents match the filter:

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
6

After you complete this tutorial, you can free resources by stopping the tutorial Docker containers and MongoDB data. Navigate to the tutorial directory "mongodb-kafka-base" that you created in the setup step and run the following command:

To restart the containers, follow the same steps required to start them in the Tutorial Setup.

In this tutorial, you created a change stream on MongoDB and observed the output. The MongoDB Kafka source connector reads the change events from a change stream that you configure, and writes them to a Kafka topic.

To learn how to configure a change stream and Kafka topic for a source connector, proceed to the Getting Started with the MongoDB Kafka Source Connector tutorial.

Read the following resources to learn more about concepts mentioned in this tutorial:

Note

Send your tutorial feedback or ideas for future MongoDB Kafka Connector tutorials through this feedback form.

←  Kafka Connector Tutorial SetupGetting Started with the MongoDB Kafka Source Connector →
Give Feedback
© 2022 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2022 MongoDB, Inc.