How Do Change Streams Work in MongoDB?

What are Change Streams?

A change stream is a real-time stream of database changes that flows from your database to your application. With change streams, your applications can react—in real time—to data changes in a single collection, a database, or even an entire deployment. For apps that rely on notifications of changing data, change streams are critical.

A few use cases where you might find change streams include:

  • Analytics Dashboards - Change streams can provide an audit trail for applications.
  • IoT Event Tracking - Change streams can be used to detect and adjust a system to events that internet-enabled devices are tracking - for example, tracking when a device moves outside of a geo-fencing area. A change stream can be filtered to detect only those events that fall outside of this range and trigger an alarm when it happens.
  • Real-Time Trading Applications - Change streams can be used to track changes to financial data and react to them in real time.

If you’re using MongoDB 3.6 or later, change streams are already built in, and taking advantage of them is easy.

Let’s look at the major features of MongoDB change streams, some of your options with those streams (such as modifying the output of the stream), and finally dive into code to see how to implement MongoDB change streams with Python and Node.js.

Features of MongoDB Change Streams

MongoDB change streams provide a high-level API that can notify an application of changes to a MongoDB database, collection, or cluster, without using polling (which would come with much higher overhead). Here are some characteristics of change streams that might help you to understand how change streams work and what they can be used for:

  • Filterable
    • Applications can filter changes to receive only those change notifications they need.
  • Resumable - Change streams are resumable because each response comes with a resume token. Using the token, an application can start the stream where it left off (if it ever disconnects).
  • In order
    • Change notifications occur in the same order that the database was updated.
  • Durable - Change streams only include majority-committed changes. This is so every change seen by listening applications is durable in failure scenarios, such as electing a new primary.
  • Secure - Only users with rights to read a collection can create a change stream on that collection.
  • Easy to use
    • The syntax of the change streams API uses the existing MongoDB drivers and query language.

Availability of MongoDB Change Streams

In order to use change streams in MongoDB, there are a few requirements your environment must meet.

In MongoDB 4.0 and earlier, change streams are available only if read concern "majority" support is enabled by default. Read concern “majority” guarantees that the documents read are durable and guaranteed not to roll back. However, starting in MongoDB 4.2, change streams are available regardless of the read concern "majority" support.

Using Change Streams in MongoDB Atlas

If you want to experiment with MongoDB change streams and don’t have a development environment set up that supports using them, you can sign up for an account at MongoDB Atlas and choose the free cluster option. Minutes later, you will have a cluster that supports change streams and is free for life.

setting up change streams in mongodb atlas

Opening a Change Stream

To open a change stream for a replica set, you can issue the open change stream operation from any of the data-bearing members. For a sharded cluster, you must issue the open change stream operation from the mongos binary.

Fortunately, most MongoDB drivers support using change streams in a syntax that should be pretty familiar. Let’s look at examples using the official MongoDB Node Driver in Node.js and PyMongo in Python to see how easy it really is.

MongoDB Change Streams Node.js Example

This example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. It assumes that you have connected to a MongoDB replica set and accessed a database with a comment collection.

Here, we use a stream to process all change events in the comment collection:

conn = new Mongo("YOUR_CONNECTION_STRING");
db = conn.getDB('blog');
const collection = db.collection('comment');
const changeStream = collection.watch();
changeStream.on('change', next => {
  // do something when there is a change in the comment collection
});

But we can also use an iterator to process the change events by iterating the change stream cursor:

const changeStreamIterator = collection.watch();
const next = await changeStreamIterator.next();

MongoDB Change Streams Python Example

Opening a change stream with Python is very similar and just as easy. In Python, we open a change stream for a collection and iterate over the cursor to retrieve the change stream documents. This example assumes you have connected to a MongoDB replica set and have accessed a database containing an inventory collection.

cursor = db.inventory.watch()
document = next(cursor)

Modifying the Output of a MongoDB Change Stream

Customizing the change stream to meet your needs is also straightforward. You control the change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream. For details of what each does, you can check out the documentation.

  • $addFields
  • $match
  • $project
  • $replaceRoot
  • $replaceWith (Available starting in MongoDB 4.2)
  • $redact
  • $set (Available starting in MongoDB 4.2)
  • $unset (Available starting in MongoDB 4.2)

This allows you to filter the complete change stream down to just those changes you want to listen for. For example, here’s how to modify the change stream output with both Node.js and Python.

Node.js:

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];

const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
  // process next document
});

Python:

pipeline = [
    {'$match': {'fullDocument.username': 'alice'}},
    {'$addFields': {'newField': 'this is an added field!'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
document = next(cursor)

Resuming Change Streams

If a change stream is disconnected, it can reestablish itself and start listening where it left off. This is possible because each change stream event comes with a resume token.

{
  _id: <resumeToken>,
  operationType: 'update'
  ...
}

The client can reestablish its change stream by passing this resume token, and it will be able to start where it left off.

MongoDB drivers will also try to automatically resume one time, just in case the error is a transient error such as a network error. Still, even in this situation, you can access this resume token yourself and write your own retry logic.

Here, we cache the resume token from the change by storing the _id. On an error, we try to reestablish the change stream using this cached resume token.

In Node.js:

changeStream.on('change', (change) => {
    console.log(change)
    cachedResumeToken = change["_id"]
})

changeStream.on('error', () => {
    if (cachedResumeToken) {
        establishChangeStream(cachedResumeToken)
    }
})

In Python, the code is even simpler:

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

Conclusion

Change streams transform a MongoDB database into a real-time database by taking advantage of MongoDB’s replication process. They monitor replication in MongoDB, providing an API for external applications that require real-time data without the risk involved in tailing the oplog or the overhead that comes with polling. For more details, check out the official change stream documentation.

FAQs

What is a change stream in MongoDB?

A change stream is a real-time stream, flowing from your MongoDB database to your application, of all database changes.


Does MongoDB allow duplicates?

MongoDB does allow duplicates unless you create a unique index on a field or multiple fields.


Is MongoDB a real-time database?

You can use MongoDB like a real-time database by implementing change streams.


Does MongoDB have triggers?

MongoDB does not have native support for triggers. But by using change streams, which can notify an external application of any document changes, you can create your own triggers from scratch in the programming language of your choice, or you can create triggers in MongoDB Realm.


Ready to get started?

Launch a new cluster or migrate to MongoDB Atlas with zero downtime. No credit card required.