MongoDB Change Streams with Python
Rate this quickstart
Change streams allow you to listen to changes that occur in your MongoDB database. On MongoDB 3.6 or above, this functionality allows you to build applications that can immediately respond to real time data changes. In this tutorial, we'll show you how to use change streams with Python. In particular you will:
- Learn about change streams
- Create a program that listens to inserts
- Change the program to listen to other event types
- Change the program to listen to specific notifications
To follow along, you can create a test environment using the steps below. This is optional but highly encouraged as it will allow you to test usage of the change stream functionality with the examples provided. You will be given all commands, but some familiarity with MongoDB is needed.
The ability to listen to specific changes in the data allows an application to be much faster in responding to change. If a user of your system updates their information, the system can listen to and propagate these changes right away. For example, this could mean users no longer have to click refresh to see when changes have been applied. Or if a user's changes in one system need approval by someone, another system could listen to changes and send notifications requesting approvals instantaneously.
Before change streams, applications that needed to know about the addition of new data in real-time had to continuously poll data or rely on other update mechanisms. One common, if complex, technique for monitoring changes was tailing MongoDB's Operation Log (Oplog). The Oplog is part of the replication system of MongoDB and as such already tracks modifications to the database but is not easy to use for business logic. Change streams are built on top of the Oplog but they provide a native API that improves efficiency and usability. Note that you cannot open a change stream against a collection in a standalone MongoDB server because the feature relies on the Oplog which is only used on replica sets.
When registering a change stream you need to specify the collection and what types of changes you want to listen to. You can do this by using the
$matchand a few other aggregation pipeline stages which limit the amount of data you will receive. If your database enforces authentication and authorization, change streams provide the same access control as for normal queries.
The best way to understand how change streams operate is to work with them. In the next section, we'll show you how to set up a server and scripts. After completing the setup, you will get two scripts: One Python script will listen to notifications from the change stream and print them. The other script will mimic an application by performing insert, update, replace, and delete operations so that you can see the notifications in the output of the first script. You will also learn how to limit the notifications to the ones you are interested in.
To create and activate your virtual environment, run the following commands in your terminal:
For ease of reading, we assume you are running Python 3 with the python3 and pip3 commands. If you are running Python 2.7, substitute python and pip for those commands.
If you do not have MongoDB installed, would prefer not to mess with your local setup or if you are fairly new to MongoDB then we recommend that you set up a MongoDB Atlas cluster; there's a free tier which gives you a three node replica set which is ideal for experimenting and learning with. Simply follow until you get the URI connection string in step 8. Take that URI connection string, insert the password where it says
<password>, and add it to your environment by running
in your terminal. The string you use as a value will be different.
Before continuing, quickly test your setup. Create a file
test.pywith the following contents:
When you run
python3 test.pyyou should see an
Now that you've confirmed your setup, let's create the small program that will listen to changes in the database using a change stream. Create a different file
change_streams.pywith the following content:
Go ahead and run
python3 change_streams.py, you will notice that the program doesn't print anything and just waits for operations to happen on the specified collection. While keeping the
change_streamsprogram running, open up another terminal window and run
python3 test.py. You will have to run the same export command you ran in the Set up your Cluster section to add the environment variable to the new terminal window.
Checking the terminal window that is running the
change_streamsprogram, you will see that the insert operation was logged. It should look like the output below but with a different
ObjectIdand with a different value for
You can listen to four types of document-based events:
Depending on the type of event the document structure you will receive will differ slightly but you will always receive the following:
In the case of inserts and replace operations the
fullDocumentis provided by default as well. In the case of update operations the extra field provided is
updateDescriptionand it gives you the document delta (i.e. the difference between the document before and after the operation). By default update operations only include the delta between the document before and after the operation. To get the full document with each update you can . If an update operation ends up changing multiple documents, there will be one notification for each updated document. This transformation occurs to ensure that statements in the oplog are .
There is one further type of event that can be received which is the invalidate event. This tells the driver that the change stream is no longer valid. The driver will then close the stream. Potential reasons for this include the collection being dropped or renamed.
To see this in action update your
test.pyand run it while also running the
The output should be similar to:
So far, your program has been listening to all operations. In a real application this would be overwhelming and often unnecessary as each part of your application will generally want to listen only to specific operations. To limit the amount of operations, you can use certain aggregation stages when setting up the stream. These stages are:
$redact. All other aggregation stages are not available.
You can test this functionality by changing your
change_stream.pyfile with the code below and running the
test.pyscript. The output should now only contain insert notifications.
You can also match on document fields and thus limit the stream to certain
DocumentIdsor to documents that have a certain document field, etc.
No matter how good your network, there will be situations when connections fail. To make sure that no changes are missed in such cases, you need to add some code for storing and handling
resumeTokens. Each event contains a
resumeToken, for example:
When a failure occurs, the driver should automatically make one attempt to reconnect. The application has to handle further retries as needed. This means that the application should take care of always persisting the
To retry connecting, the
resumeTokenhas to be passed into the optional field resumeAfter when creating the new change stream. This does not guarantee that we can always resume the change stream. MongoDB's oplog is a capped collection that keeps a rolling record of the most recent operations. Resuming a change stream is only possible if the oplog has not rolled yet (that is if the changes we are interested in are still in the oplog).
- Ordering and Rollbacks: MongoDB guarantees that the received events will be in the order they occurred (thus providing a total ordering of changes across shards if you use shards). On top of that only durable, i.e. majority committed changes will be sent to listeners. This means that listeners do not have to consider rollbacks in their applications.
- Reading from Secondaries: Change streams can be opened against any data-bearing node in a cluster regardless whether it's primary or secondary. However, it is generally not recommended to read from secondaries as failovers can lead to increased load and failures in this setup.
- Updates with the fullDocument Option: The fullDocument option for Update Operations does not guarantee the returned document does not include further changes. In contrast to the document deltas that are guaranteed to be sent in order with update notifications, there is no guarantee that the fullDocument returned represents the document as it was exactly after the operation.
updateLookupwill poll the current version of the document. If changes happen quickly it is possible that the document was changed before the
updateLookupfinished. This means that the fullDocument might not represent the document at the time of the event thus potentially giving the impression events took place in a different order.
In the following you will set up a single-node replica-set named
test-change-streams. For a production replica-set, at least three nodes are recommended.
- Run the following commands in your terminal to create a directory for the database files and start the mongod process on port
- Now open up a mongo shell on port
- Within the mongo shell you just opened, configure your replica set:
- Still within the mongo shell, you can now check that your replica set is working by running:
rs.status();. The output should indicate that your node has become primary. It may take a few seconds to show this so if you are not seeing this immediately, run the command again after a few seconds.
Store Sensitive Data With Python & MongoDB Client-Side Field Level Encryption
Sep 23, 2022 | 11 min read