JavaScript
MongoDB Developer Center
chevron-right
Developer Topics
chevron-right
Languages
chevron-right
JavaScript
chevron-right

Change Streams & Triggers with Node.js Tutorial

Lauren SchaeferPublished Feb 04, 2022 • Updated May 26, 2022
NodejsMongoDBChange StreamsJavaScript
facebook icontwitter iconlinkedin icon
random alt
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
QuickStart Node.js Logo
Sometimes you need to react immediately to changes in your database. Perhaps you want to place an order with a distributor whenever an item's inventory drops below a given threshold. Or perhaps you want to send an email notification whenever the status of an order changes. Regardless of your particular use case, whenever you want to react immediately to changes in your MongoDB database, change streams and triggers are fantastic options.
This post uses MongoDB 4.4, MongoDB Node.js Driver 3.6.4, and Node.js 14.15.4.
If you're just joining us in this Quick Start with MongoDB and Node.js series, welcome! We began by walking through how to
connect to MongoDB
and perform each of the
CRUD (Create, Read, Update, and Delete) operations
. Then we jumped into more advanced topics like the
aggregation framework
and
transactions
. The code we write today will use the same structure as the code we built in the first post in the series, so, if you have any questions about how to get started or how the code is structured,
head back to that post
.
And, with that, let's dive into change streams and triggers! Here is a summary of what we'll cover today:
Prefer a video over an article? Check out the video below that covers the exact same topics that I discuss in this article.
Get started with an M0 cluster on
Atlas
today. It's free forever, and it's the easiest way to try out the steps in this blog series.

What are Change Streams?

Change streams allow you to receive notifications about changes made to your MongoDB databases and collections. When you use change streams, you can choose to program actions that will be automatically taken whenever a change event occurs.
Change streams utilize the aggregation framework, so you can choose to filter for specific change events or transform the change event documents.
For example, let's say I want to be notified whenever a new listing in the Sydney, Australia market is added to the listingsAndReviews collection. I could create a change stream that monitors the listingsAndReviews collection and use an
aggregation pipeline
to match on the listings I'm interested in.
Let's take a look at three different ways to implement this change stream.

Set Up

As with all posts in this MongoDB and Node.js Quick Start series, you'll need to ensure you've completed the prerequisite steps outlined in the Set up section of the
first post in this series
.
I find it helpful to have a script that will generate sample data when I'm testing change streams. To help you quickly generate sample data, I wrote
changeStreamsTestData.js
. Download a copy of the file, update the uri constant to reflect your Atlas connection info, and run it by executing node changeStreamsTestData.js. The script will do the following:
  1. Create 3 new listings (Opera House Views, Private room in London, and Beautiful Beach House)
  2. Update 2 of those listings (Opera House Views and Beautiful Beach House)
  3. Create 2 more listings (Italian Villa and Sydney Harbour Home)
  4. Delete a listing (Sydney Harbour Home).

Create a Change Stream

Now that we're set up, let's explore three different ways to work with a change stream in Node.js.
Get a Copy of the Node.js Template
To make following along with this blog post easier, I've created a starter template for a Node.js script that accesses an Atlas cluster.
  1. Download a copy of
    template.js
    .
  2. Open template.js in your favorite code editor.
  3. Update the Connection URI to point to your Atlas cluster. If you're not sure how to do that, refer back to
    the first post in this series
    .
  4. Save the file as changeStreams.js.
You can run this file by executing node changeStreams.js in your shell. At this point, the file simply opens and closes a connection to your Atlas cluster, so no output is expected. If you see DeprecationWarnings, you can ignore them for the purposes of this post.
Create a Helper Function to Close the Change Stream
Regardless of how we monitor changes in our change stream, we will want to close the change stream after a certain amount of time. Let's create a helper function to do just that.
  1. Paste the following function in changeStreams.js.
Monitor Change Stream using EventEmitter's on()
The MongoDB Node.js Driver's ChangeStream class inherits from the Node Built-in class
EventEmitter
. As a result, we can use EventEmitter's
on()
function to add a listener function that will be called whenever a change occurs in the change stream.
Create the Function
Let's create a function that will monitor changes in the change stream using EventEmitter's on().
  1. Continuing to work in changeStreams.js, create an asynchronous function named monitorListingsUsingEventEmitter. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.
  2. Now we need to access the collection we will monitor for changes. Add the following code to monitorListingsUsingEventEmitter().
  3. Now we are ready to create our change stream. We can do so by using
    Collection
    's
    watch()
    . Add the following line beneath the existing code in monitorListingsUsingEventEmitter().
  4. Once we have our change stream, we can add a listener to it. Let's log each change event in the console. Add the following line beneath the existing code in monitorListingsUsingEventEmitter().
  5. We could choose to leave the change stream open indefinitely. Instead, let's call our helper function to set a timer and close the change stream. Add the following line beneath the existing code in monitorListingsUsingEventEmitter().
Call the Function
Now that we've implemented our function, let's call it!
  1. Inside of main() beneath the comment that says Make the appropriate DB calls, call your monitorListingsUsingEventEmitter() function:
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 60 seconds.
  4. Create and update sample data by executing
    node changeStreamsTestData.js
    in a new shell. Output similar to the following will be displayed in your first shell where you are running changeStreams.js.
    If you run node changeStreamsTestData.js again before the 60 second timer has completed, you will see similar output.
    After 60 seconds, the following will be displayed:
Call the Function with an Aggregation Pipeline
In some cases, you will not care about all change events that occur in a collection. Instead, you will want to limit what changes you are monitoring. You can use an aggregation pipeline to filter the changes or transform the change stream event documents.
In our case, we only care about new listings in the Sydney, Australia market. Let's create an aggregation pipeline to filter for only those changes in the listingsAndReviews collection.
To learn more about what aggregation pipeline stages can be used with change streams, see the
official change streams documentation
.
  1. Inside of main() and above your existing call to monitorListingsUsingEventEmitter(), create an aggregation pipeline:
  2. Let's use this pipeline to filter the changes in our change stream. Update your existing call to monitorListingsUsingEventEmitter() to only leave the change stream open for 30 seconds and use the pipeline.
  3. Save your file.
  4. Run your script by executing node changeStreams.js in your shell. The change stream will open for 30 seconds.
  5. Create and update sample data by executing
    node changeStreamsTestData.js
    in a new shell. Because the change stream is using the pipeline you just created, only documents inserted into the listingsAndReviews collection that are in the Sydney, Australia market will be in the change stream. Output similar to the following will be displayed in your first shell where you are running changeStreams.js.
    After 30 seconds, the following will be displayed:
Monitor Change Stream using ChangeStream's hasNext()
In the section above, we used EventEmitter's on() to monitor the change stream. Alternatively, we can create a while loop that waits for the next element in the change stream by using
hasNext()
from MongoDB Node.js Driver's
ChangeStream
class.
Create the Function
Let's create a function that will monitor changes in the change stream using ChangeStream's hasNext().
  1. Continuing to work in changeStreams.js, create an asynchronous function named monitorListingsUsingHasNext. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.
  2. Now we need to access the collection we will monitor for changes. Add the following code to monitorListingsUsingHasNext().
  3. Now we are ready to create our change stream. We can do so by using
    Collection
    's
    watch()
    . Add the following line beneath the existing code in monitorListingsUsingHasNext().
  4. We could choose to leave the change stream open indefinitely. Instead, let's call our helper function that will set a timer and close the change stream. Add the following line beneath the existing code in monitorListingsUsingHasNext().
  5. Now let's create a while loop that will wait for new changes in the change stream. We can use ChangeStream's
    hasNext()
    inside of the while loop. hasNext() will wait to return true until a new change arrives in the change stream. hasNext() will throw an error as soon as the change stream is closed, so we will surround our while loop with a try { } block. If an error is thrown, we'll check to see if the change stream is closed. If the change stream is closed, we'll log that information. Otherwise, something unexpected happened, so we'll throw the error. Add the following code beneath the existing code in monitorListingsUsingHasNext().
Call the Function
Now that we've implemented our function, let's call it!
  1. Inside of main(), replace your existing call to monitorListingsUsingEventEmitter() with a call to your new monitorListingsUsingHasNext():
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 60 seconds.
  4. Create and update sample data by executing
    node changeStreamsTestData.js
    in a new shell. Output similar to
    what we saw earlier
    will be displayed in your first shell where you are running changeStreams.js. If you run node changeStreamsTestData.js again before the 60 second timer has completed, you will see similar output again. After 60 seconds, the following will be displayed:
Call the Function with an Aggregation Pipeline
As we discussed earlier, sometimes you will want to use an aggregation pipeline to filter the changes in your change stream or transform the change stream event documents. Let's pass the aggregation pipeline we created in an earlier section to our new function.
  1. Update your existing call to monitorListingsUsingHasNext() to only leave the change stream open for 30 seconds and use the aggregation pipeline.
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 30 seconds.
  4. Create and update sample data by executing
    node changeStreamsTestData.js
    in a new shell. Because the change stream is using the pipeline you just created, only documents inserted into the listingsAndReviews collection that are in the Sydney, Australia market will be in the change stream. Output similar to
    what we saw earlier while using a change stream with an aggregation pipeline
    will be displayed in your first shell where you are running changeStreams.js. After 30 seconds, the following will be displayed:
Monitor Changes Stream using the Stream API
In the previous two sections, we used EventEmitter's on() and ChangeStreams's hasNext() to monitor changes. Let's examine a third way to monitor a change stream: using Node's
Stream API
.
Load the Stream Module
In order to use the Stream module, we will need to load it.
  1. Continuing to work in changeStreams.js, load the Stream module at the top of the file.
Create the Function
Let's create a function that will monitor changes in the change stream using the Stream API.
  1. Continuing to work in changeStreams.js, create an asynchronous function named monitorListingsUsingStreamAPI. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.
  2. Now we need to access the collection we will monitor for changes. Add the following code to monitorListingsUsingStreamAPI().
  3. Now we are ready to create our change stream. We can do so by using
    Collection
    's
    watch()
    . Add the following line beneath the existing code in monitorListingsUsingStreamAPI().
  4. Now we're ready to monitor our change stream. ChangeStream's
    stream()
    will return a
    Node Readable stream
    . We will call Readable's
    pipe()
    to pull the data out of the stream and write it to the console.
  5. We could choose to leave the change stream open indefinitely. Instead, let's call our helper function that will set a timer and close the change stream. Add the following line beneath the existing code in monitorListingsUsingStreamAPI().
Call the Function
Now that we've implemented our function, let's call it!
  1. Inside of main(), replace your existing call to monitorListingsUsingHasNext() with a call to your new monitorListingsUsingStreamAPI():
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 60 seconds.
  4. Output similar to
    what we saw earlier
    will be displayed in your first shell where you are running changeStreams.js. If you run node changeStreamsTestData.js again before the 60 second timer has completed, you will see similar output again. After 60 seconds, the following will be displayed:
Call the Function with an Aggregation Pipeline
As we discussed earlier, sometimes you will want to use an aggregation pipeline to filter the changes in your change stream or transform the change stream event documents. Let's pass the aggregation pipeline we created in an earlier section to our new function.
  1. Update your existing call to monitorListingsUsingStreamAPI() to only leave the change stream open for 30 seconds and use the aggregation pipeline.
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 30 seconds.
  4. Create and update sample data by executing
    node changeStreamsTestData.js
    in a new shell. Because the change stream is using the pipeline you just created, only documents inserted into the listingsAndReviews collection that are in the Sydney, Australia market will be in the change stream. Output similar to
    what we saw earlier while using a change stream with an aggregation pipeline
    will be displayed in your first shell where you are running changeStreams.js. After 30 seconds, the following will be displayed:

Resume a Change Stream

At some point, your application will likely lose the connection to the change stream. Perhaps a network error will occur and a connection between the application and the database will be dropped. Or perhaps your application will crash and need to be restarted (but you're a 10x developer and that would never happen to you, right?).
In those cases, you may want to resume the change stream where you previously left off so you don't lose any of the change events.
Each change stream event document contains a resume token. The Node.js driver automatically stores the resume token in the _id of the change event document.
The application can pass the resume token when creating a new change stream. The change stream will include all events that happened after the event associated with the given resume token.
The MongoDB Node.js driver will automatically attempt to reestablish connections in the event of transient network errors or elections. In those cases, the driver will use its cached copy of the most recent resume token so that no change stream events are lost.
In the event of an application failure or restart, the application will need to pass the resume token when creating the change stream in order to ensure no change stream events are lost. Keep in mind that the driver will lose its cached copy of the most recent resume token when the application restarts, so your application should store the resume token.
For more information and sample code for resuming change streams, see the
official documentation
.

What are MongoDB Atlas Triggers?

Change streams allow you to react immediately to changes in your database. If you want to constantly be monitoring changes to your database, ensuring that your application that is monitoring the change stream is always up and not missing any events is possible... but can be challenging. This is where MongoDB Atlas triggers come in.
MongoDB supports triggers in
Atlas
.
Atlas triggers
allow you to execute functions in real time based on database events (just like change streams) or on scheduled intervals (like a cron job). Atlas triggers have a few big advantages:
  • You don't have to worry about programming the change stream. You simply program the function that will be executed when the database event is fired.
  • You don't have to worry about managing the server where your change stream code is running. Atlas takes care of the server management for you.
  • You get a handy UI to configure your trigger, which means you have less code to write.
Atlas triggers do have a few constraints. The biggest constraint I hit in the past was that functions did not support module imports (i.e. import and require). That has changed, and you can now upload external dependencies that you can use in your functions. See
Upload External Dependencies
for more information. To learn more about functions and their constraints, see the
official Realm Functions documentation
.

Create a MongoDB Atlas Trigger

Just as we did in earlier sections, let's look for new listings in the Sydney, Australia market. Instead of working locally in a code editor to create and monitor a change stream, we'll create a trigger in the Atlas web UI.
Create a Trigger
Let's create an Atlas trigger that will monitor the listingsAndReviews collection and call a function whenever a new listing is added in the Sydney, Australia market.
  1. Navigate to your project in
    Atlas
    .
  2. In the Data Storage section of the left navigation pane, click Triggers.
  3. Click Add Trigger. The Add Trigger wizard will appear.
  4. In the Link Data Source(s) selection box, select your cluster that contains the sample_airbnb database and click Link. The changes will be deployed. The deployment may take a minute or two. Scroll to the top of the page to see the status.
  5. In the Select a cluster... selection box, select your cluster that contains the sample_airbnb database.
  6. In the Select a database name... selection box, select sample_airbnb.
  7. In the Select a collection name... selection box, select listingsAndReviews.
  8. In the Operation Type section, check the box beside Insert.
  9. In the Function code box, replace the commented code with a call to log the change event. The code should now look like the following:
  10. We can create a
    $match
    statement to filter our change events just as we did earlier with the aggregation pipeline we passed to the change stream in our Node.js script. Expand the ADVANCED (OPTIONAL) section at the bottom of the page and paste the following in the Match Expression code box.
  11. Click Save. The trigger will be enabled. From that point on, the function to log the change event will be called whenever a new document in the Sydney, Australia market is inserted in the listingsAndReviews collection.
Fire the Trigger
Now that we have the trigger configured, let's create sample data that will fire the trigger.
  1. Return to the shell on your local machine.
  2. Create and update sample data by executing
    node changeStreamsTestData.js
    in a new shell.
View the Trigger Results
When you created the trigger, MongoDB Atlas automatically created a Realm application for you named Triggers_RealmApp.
The function associated with your trigger doesn't currently do much. It simply prints the change event document. Let's view the results in the logs of the Realm app associated with your trigger.
  1. Return to your browser where you are viewing your trigger in Atlas.
  2. In the navigation bar toward the top of the page, click Realm.
  3. In the Applications pane, click Triggers_RealmApp. The Triggers_RealmApp Realm application will open.
  4. In the MANAGE section of the left navigation pane, click Logs. Two entries will be displayed in the Logs pane—one for each of the listings in the Sydney, Australia market that was inserted into the collection.
  5. Click the arrow at the beginning of each row in the Logs pane to expand the log entry. Here you can see the full document that was inserted.
If you insert more listings in the Sydney, Australia market, you can refresh the Logs page to see the change events.

Wrapping Up

Today we explored four different ways to accomplish the same task of reacting immediately to changes in the database. We began by writing a Node.js script that monitored a change stream using Node.js's Built-in
EventEmitter
class. Next we updated the Node.js script to monitor a change stream using the MongoDB Node.js Driver's
ChangeStream
class. Then we updated the Node.js script to monitor a change stream using the
Stream API
. Finally, we created an Atlas trigger to monitor changes. In all four cases, we were able to use
$match
to filter the change stream events.
This post included many code snippets that built on code written in the
first post
of this MongoDB and Node.js Quick Start series. To get a full copy of the code used in today's post, visit the
Node.js Quick Start GitHub Repo
.
The examples we explored today all did relatively simple things whenever an event was fired: they logged the change events. Change streams and triggers become really powerful when you start doing more in response to change events. For example, you might want to fire alarms, send emails, place orders, update other systems, or do other amazing things.
This is the final post in the Node.js and MongoDB Quick Start Series (at least for now!). I hope you've enjoyed it! If you have ideas for other topics you'd like to see covered, let me know in the
MongoDB Community
.

Additional Resources


Copy Link
facebook icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Code Example
Building a Mobile Chat App Using Realm – Data Architecture

Jan 28, 2022
Article
MongoDB & Mongoose: Compatibility and Comparison

Nov 25, 2021
Quickstart
Connect to a MongoDB Database Using Node.js

Feb 04, 2022
Tutorial
Tutorial: Build a Movie Search Application Using Atlas Search

Feb 12, 2022
Table of Contents