An Introduction to Change Streams

Raphael Londner and Alyson Cabral

There is tremendous pressure for applications to immediately react to changes as they occur. As a new feature in MongoDB 3.6, change streams enable applications to stream real-time data changes by leveraging MongoDB’s underlying replication capabilities. Think powering trading applications that need to be updated in real-time as stock prices change. Or creating an IoT data pipeline that generates alarms whenever a connected vehicle moves outside of a geo-fenced area. Or updating dashboards, analytics systems, and search engines as operational data changes. The list, and the possibilities, go on, as change streams give MongoDB users easy access to real-time data changes without the complexity or risk of tailing the oplog (operation log). Any application can readily subscribe to changes and immediately react by making decisions that help the business to respond to events in real-time.

Change streams can notify your application of all writes to documents (including deletes) and provide access to all available information as changes occur, without polling that can introduce delays, incur higher overhead (due to the database being regularly checked even if nothing has changed), and lead to missed opportunities.

Characteristics of change streams

  1. Targeted changes
    Changes can be filtered to provide relevant and targeted changes to listening applications. As an example, filters can be on operation type or fields within the document.
  2. Resumablility
    Resumability was top of mind when building change streams to ensure that applications can see every change in a collection. Each change stream response includes a resume token. In cases where the connection between the application and the database is temporarily lost, the application can send the last resume token it received and change streams will pick up right where the application left off. In cases of transient network errors or elections, the driver will automatically make an attempt to reestablish a connection using its cached copy of the most recent resume token. However, to resume after application failure, the applications needs to persist the resume token, as drivers do not maintain state over application restarts.
  3. Total ordering
    MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster. Applications will always receive changes in the order they were applied to the database.
  4. Durability
    Change streams only include majority-committed changes. This means that every change seen by listening applications is durable in failure scenarios such as a new primary being elected.
  5. Security
    Change streams are secure – users are only able to create change streams on collections to which they have been granted read access.
  6. Ease of use
    Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format.
  7. Idempotence
    All changes are transformed into a format that’s safe to apply multiple times. Listening applications can use a resume token from any prior change stream event, not just the most recent one, because reapplying operations is safe and will reach the same consistent state.

An example

Let’s imagine that we run a small grocery store. We want to build an application that notifies us every time we run out of stock for an item. We want to listen for changes on our stock collection and reorder once the quantity of an item gets too low.

{	_id: 123UAWERXHZK4GYH
	product: pineapple
	quantity: 3
}

Setting up the cluster

As a distributed database, replication is a core feature of MongoDB, mirroring changes from the primary replica set member to secondary members, enabling applications to maintain availability in the event of failures or scheduled maintenance. Replication relies on the oplog (operation log). The oplog is a capped collection that records all of the most recent writes, it is used by secondary members to apply changes to their own local copy of the database. In MongoDB 3.6, change streams enable listening applications to easily leverage the same internal, efficient replication infrastructure for real-time processing.

To use change streams, we must first create a replica set. Download MongoDB 3.6 and after installing it, run the following commands to set up a simple, single-node replica set (for testing purposes).

mkdir -pv data/db 
mongod --dbpath ./data/db --replSet "rs"

Then in a separate shell tab, run: mongo

After the rs:PRIMARY> prompt appears, run: rs.initiate()

If you have any issues, check out our documentation on creating a replica set.

Seeing it in action

Now that our replica set is ready, let’s create a few products in a demo database using the following Mongo shell script:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;

var docToInsert = {
  name: "pineapple",
  quantity: 10
};

function sleepFor(sleepDuration) {
  var now = new Date().getTime();
  while (new Date().getTime() < now + sleepDuration) {
    /* do nothing */
  }
}

function create() {
  sleepFor(1000);
  print("inserting doc...");
  docToInsert.quantity = 10 + Math.floor(Math.random() * 10);
  res = collection.insert(docToInsert);
  print(res)
}

while (true) {
  create();
}

Copy the code above into a createProducts.js text file and run it in a Terminal window with the following command: mongo createProducts.js.

Creating a change stream application

Now that we have documents being constantly added to our MongoDB database, we can create a change stream that monitors and handles changes occurring in our stock collection:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;

const changeStreamCursor = collection.watch();

pollStream(changeStreamCursor);

//this function polls a change stream and prints out each change as it comes in
function pollStream(cursor) {
  while (!cursor.isExhausted()) {
    if (cursor.hasNext()) {
      change = cursor.next();
      print(JSON.stringify(change));
    }
  }
  pollStream(cursor);
}

By using the parameterless watch() method, this change stream will signal every write to the stock collection. In the simple example above, we’re logging the change stream's data to the console. In a real-life scenario, your listening application would do something more useful (such as replicating the data into a downstream system, sending an email notification, reordering stock...). Try inserting a document through the mongo shell and see the changes logged in the Mongo Shell.

Creating a targeted change stream

Remember that our original goal wasn’t to get notified of every single update in the stock collection, just when the inventory of each item in the stock collection falls below a certain threshold. To achieve this, we can create a more targeted change stream for updates that set the quantity of an item to a value no higher than 10. By default, update notifications in change streams only include the modified and deleted fields (i.e. the document “deltas”), but we can use the optional parameter fullDocument: "updateLookup" to include the complete document within the change stream, not just the deltas.

const changeStream = collection.watch(
  [{
    $match: {
      $and: [
        { "updateDescription.updatedFields.quantity": { $lte: 10 } },
        { operationType: "update" }
      ]
    }
  }],
  {
    fullDocument: "updateLookup"
  }
);

Note that the fullDocument property above reflects the state of the document at the time lookup was performed, not the state of the document at the exact time the update was applied. Meaning, other changes may also be reflected in the fullDocument field. Since this use case only deals with updates, it was preferable to build match filters using updateDescription.updatedFields, instead of fullDocument.

The full Mongo shell script of our filtered change stream is available below:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;

let updateOps = {
  $match: {
    $and: [
      { "updateDescription.updatedFields.quantity": { $lte: 10 } },
      { operationType: "update" }
    ]
  }
};

const changeStreamCursor = collection.watch([updateOps]);

pollStream(changeStreamCursor);

//this function polls a change stream and prints out each change as it comes in
function pollStream(cursor) {
  while (!cursor.isExhausted()) {
    if (cursor.hasNext()) {
      change = cursor.next();
      print(JSON.stringify(change));
    }
  }
  pollStream(cursor);
}

In order to test our change stream above, let’s run the following script to set the quantity of all our current products to values less than 10:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;
let updatedQuantity = 1;

function sleepFor(sleepDuration) {
  var now = new Date().getTime();
  while (new Date().getTime() < now + sleepDuration) {
    /* do nothing */
  }
}

function update() {
  sleepFor(1000);
  res = collection.update({quantity:{$gt:10}}, {$inc: {quantity: -Math.floor(Math.random() * 10)}}, {multi: true});
  print(res)
  updatedQuantity = res.nMatched + res.nModified;
}

while (updatedQuantity > 0) {
  update();
}

You should now see the change stream window display the update shortly after the script above updates our products in the stock collection.

Resuming a change stream

In most cases, drivers have retry logic to handle loss of connections to the MongoDB cluster (such as , timeouts, or transient network errors, or elections). In cases where our application fails and wants to resume, we can use the optional parameter resumeAfter : <resumeToken>, as shown below:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;

const changeStreamCursor = collection.watch();
resumeStream(changeStreamCursor, true);

function resumeStream(changeStreamCursor, forceResume = false) {
  let resumeToken;
  while (!changeStreamCursor.isExhausted()) {
    if (changeStreamCursor.hasNext()) {
      change = changeStreamCursor.next();
      print(JSON.stringify(change));
      resumeToken = change._id;
      if (forceResume === true) {
        print("\r\nSimulating app failure for 10 seconds...");
        sleepFor(10000);
        changeStreamCursor.close();
        const newChangeStreamCursor = collection.watch([], {
          resumeAfter: resumeToken
        });
        print("\r\nResuming change stream with token " + JSON.stringify(resumeToken) + "\r\n");
        resumeStream(newChangeStreamCursor);
      }
    }
  }
  resumeStream(changeStreamCursor, forceResume);
}

With this resumability feature, MongoDB change streams provide at-least-once semantics. It is therefore up to the listening application to make sure that it has not already processed the change stream events. This is especially important in cases where the application’s actions are not idempotent (for instance, if each event triggers a wire transfer).

All the of shell scripts examples above are available in the following GitHub repository. You can also find similar Node.js code samples here, where a more realistic technique is used to persist the last change stream token before it is processed.

Next steps

I hope that this introduction gets you excited about the power of change streams in MongoDB 3.6.

If you want to know more:

If you have any question, feel free to file a ticket at https://jira.mongodb.org or connect with us through one of the social channels we use to interact with the developer community.

About the authors – Aly Cabral and Raphael Londner

Aly Cabral is a Product Manager at MongoDB. With a focus on Distributed Systems (i.e. Replication and Sharding), when she hears the word election she doesn’t think about politics. You can follow her or ask any questions on Twitter at @aly_cabral

Raphael Londner is a Principal Developer Advocate at MongoDB. Previously he was a developer advocate at Okta as well as a startup entrepreneur in the identity management space. You can follow him on Twitter at @rlondner



Get Started with MongoDB Atlas

Run MongoDB in the cloud for free with MongoDB Atlas. No credit card required.