EventGet 50% off your ticket to MongoDB.local London on October 2. Use code WEB50Learn more >>
MongoDB Developer
JavaScript
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
JavaScriptchevron-right

Change Streams & Triggers with Node.js Tutorial

Lauren Schaefer17 min read • Published Feb 04, 2022 • Updated Aug 24, 2023
Node.jsMongoDBChange StreamsJavaScript
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
QuickStart Node.js Logo
Sometimes you need to react immediately to changes in your database. Perhaps you want to place an order with a distributor whenever an item's inventory drops below a given threshold. Or perhaps you want to send an email notification whenever the status of an order changes. Regardless of your particular use case, whenever you want to react immediately to changes in your MongoDB database, change streams and triggers are fantastic options.
If you're just joining us in this Quick Start with MongoDB and Node.js series, welcome! We began by walking through how to connect to MongoDB and perform each of the CRUD (Create, Read, Update, and Delete) operations. Then we jumped into more advanced topics like the aggregation framework and transactions. The code we write today will use the same structure as the code we built in the first post in the series, so, if you have any questions about how to get started or how the code is structured, head back to that post.
And, with that, let's dive into change streams and triggers! Here is a summary of what we'll cover today:
Prefer a video over an article? Check out the video below that covers the exact same topics that I discuss in this article.
Get started with an M0 cluster on Atlas today. It's free forever, and it's the easiest way to try out the steps in this blog series.

What are Change Streams?

Change streams allow you to receive notifications about changes made to your MongoDB databases and collections. When you use change streams, you can choose to program actions that will be automatically taken whenever a change event occurs.
Change streams utilize the aggregation framework, so you can choose to filter for specific change events or transform the change event documents.
For example, let's say I want to be notified whenever a new listing in the Sydney, Australia market is added to the listingsAndReviews collection. I could create a change stream that monitors the listingsAndReviews collection and use an aggregation pipeline to match on the listings I'm interested in.
Let's take a look at three different ways to implement this change stream.

Set Up

As with all posts in this MongoDB and Node.js Quick Start series, you'll need to ensure you've completed the prerequisite steps outlined in the Set up section of the first post in this series.
I find it helpful to have a script that will generate sample data when I'm testing change streams. To help you quickly generate sample data, I wrote changeStreamsTestData.js. Download a copy of the file, update the uri constant to reflect your Atlas connection info, and run it by executing node changeStreamsTestData.js. The script will do the following:
  1. Create 3 new listings (Opera House Views, Private room in London, and Beautiful Beach House)
  2. Update 2 of those listings (Opera House Views and Beautiful Beach House)
  3. Create 2 more listings (Italian Villa and Sydney Harbour Home)
  4. Delete a listing (Sydney Harbour Home).

Create a Change Stream

Now that we're set up, let's explore three different ways to work with a change stream in Node.js.

Get a Copy of the Node.js Template

To make following along with this blog post easier, I've created a starter template for a Node.js script that accesses an Atlas cluster.
  1. Download a copy of template.js.
  2. Open template.js in your favorite code editor.
  3. Update the Connection URI to point to your Atlas cluster. If you're not sure how to do that, refer back to the first post in this series.
  4. Save the file as changeStreams.js.
You can run this file by executing node changeStreams.js in your shell. At this point, the file simply opens and closes a connection to your Atlas cluster, so no output is expected. If you see DeprecationWarnings, you can ignore them for the purposes of this post.

Create a Helper Function to Close the Change Stream

Regardless of how we monitor changes in our change stream, we will want to close the change stream after a certain amount of time. Let's create a helper function to do just that.
  1. Paste the following function in changeStreams.js.

Monitor Change Stream using EventEmitter's on()

The MongoDB Node.js Driver's ChangeStream class inherits from the Node Built-in class EventEmitter. As a result, we can use EventEmitter's on() function to add a listener function that will be called whenever a change occurs in the change stream.

Create the Function

Let's create a function that will monitor changes in the change stream using EventEmitter's on().
  1. Continuing to work in changeStreams.js, create an asynchronous function named monitorListingsUsingEventEmitter. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.
  2. Now we need to access the collection we will monitor for changes. Add the following code to monitorListingsUsingEventEmitter().
  3. Now we are ready to create our change stream. We can do so by using Collection's watch(). Add the following line beneath the existing code in monitorListingsUsingEventEmitter().
  4. Once we have our change stream, we can add a listener to it. Let's log each change event in the console. Add the following line beneath the existing code in monitorListingsUsingEventEmitter().
  5. We could choose to leave the change stream open indefinitely. Instead, let's call our helper function to set a timer and close the change stream. Add the following line beneath the existing code in monitorListingsUsingEventEmitter().

Call the Function

Now that we've implemented our function, let's call it!
  1. Inside of main() beneath the comment that says Make the appropriate DB calls, call your monitorListingsUsingEventEmitter() function:
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 60 seconds.
  4. Create and update sample data by executing node changeStreamsTestData.js in a new shell. Output similar to the following will be displayed in your first shell where you are running changeStreams.js.
    If you run node changeStreamsTestData.js again before the 60 second timer has completed, you will see similar output.
    After 60 seconds, the following will be displayed:

Call the Function with an Aggregation Pipeline

In some cases, you will not care about all change events that occur in a collection. Instead, you will want to limit what changes you are monitoring. You can use an aggregation pipeline to filter the changes or transform the change stream event documents.
In our case, we only care about new listings in the Sydney, Australia market. Let's create an aggregation pipeline to filter for only those changes in the listingsAndReviews collection.
To learn more about what aggregation pipeline stages can be used with change streams, see the official change streams documentation.
  1. Inside of main() and above your existing call to monitorListingsUsingEventEmitter(), create an aggregation pipeline:
  2. Let's use this pipeline to filter the changes in our change stream. Update your existing call to monitorListingsUsingEventEmitter() to only leave the change stream open for 30 seconds and use the pipeline.
  3. Save your file.
  4. Run your script by executing node changeStreams.js in your shell. The change stream will open for 30 seconds.
  5. Create and update sample data by executing node changeStreamsTestData.js in a new shell. Because the change stream is using the pipeline you just created, only documents inserted into the listingsAndReviews collection that are in the Sydney, Australia market will be in the change stream. Output similar to the following will be displayed in your first shell where you are running changeStreams.js.
    After 30 seconds, the following will be displayed:

Monitor Change Stream using ChangeStream's hasNext()

In the section above, we used EventEmitter's on() to monitor the change stream. Alternatively, we can create a while loop that waits for the next element in the change stream by using hasNext() from MongoDB Node.js Driver's ChangeStream class.

Create the Function

Let's create a function that will monitor changes in the change stream using ChangeStream's hasNext().
  1. Continuing to work in changeStreams.js, create an asynchronous function named monitorListingsUsingHasNext. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.
  2. Now we need to access the collection we will monitor for changes. Add the following code to monitorListingsUsingHasNext().
  3. Now we are ready to create our change stream. We can do so by using Collection's watch(). Add the following line beneath the existing code in monitorListingsUsingHasNext().
  4. We could choose to leave the change stream open indefinitely. Instead, let's call our helper function that will set a timer and close the change stream. Add the following line beneath the existing code in monitorListingsUsingHasNext().
  5. Now let's create a while loop that will wait for new changes in the change stream. We can use ChangeStream's hasNext() inside of the while loop. hasNext() will wait to return true until a new change arrives in the change stream. hasNext() will throw an error as soon as the change stream is closed, so we will surround our while loop with a try { } block. If an error is thrown, we'll check to see if the change stream is closed. If the change stream is closed, we'll log that information. Otherwise, something unexpected happened, so we'll throw the error. Add the following code beneath the existing code in monitorListingsUsingHasNext().

Call the Function

Now that we've implemented our function, let's call it!
  1. Inside of main(), replace your existing call to monitorListingsUsingEventEmitter() with a call to your new monitorListingsUsingHasNext():
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 60 seconds.
  4. Create and update sample data by executing node changeStreamsTestData.js in a new shell. Output similar to what we saw earlier will be displayed in your first shell where you are running changeStreams.js. If you run node changeStreamsTestData.js again before the 60 second timer has completed, you will see similar output again. After 60 seconds, the following will be displayed:

Call the Function with an Aggregation Pipeline

As we discussed earlier, sometimes you will want to use an aggregation pipeline to filter the changes in your change stream or transform the change stream event documents. Let's pass the aggregation pipeline we created in an earlier section to our new function.
  1. Update your existing call to monitorListingsUsingHasNext() to only leave the change stream open for 30 seconds and use the aggregation pipeline.
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 30 seconds.
  4. Create and update sample data by executing node changeStreamsTestData.js in a new shell. Because the change stream is using the pipeline you just created, only documents inserted into the listingsAndReviews collection that are in the Sydney, Australia market will be in the change stream. Output similar to what we saw earlier while using a change stream with an aggregation pipeline will be displayed in your first shell where you are running changeStreams.js. After 30 seconds, the following will be displayed:

Monitor Changes Stream using the Stream API

In the previous two sections, we used EventEmitter's on() and ChangeStreams's hasNext() to monitor changes. Let's examine a third way to monitor a change stream: using Node's Stream API.

Load the Stream Module

In order to use the Stream module, we will need to load it.
  1. Continuing to work in changeStreams.js, load the Stream module at the top of the file.

Create the Function

Let's create a function that will monitor changes in the change stream using the Stream API.
  1. Continuing to work in changeStreams.js, create an asynchronous function named monitorListingsUsingStreamAPI. The function should have the following parameters: a connected MongoClient, a time in ms that indicates how long the change stream should be monitored, and an aggregation pipeline that the change stream will use.
  2. Now we need to access the collection we will monitor for changes. Add the following code to monitorListingsUsingStreamAPI().
  3. Now we are ready to create our change stream. We can do so by using Collection's watch(). Add the following line beneath the existing code in monitorListingsUsingStreamAPI().
  4. Now we're ready to monitor our change stream. ChangeStream's stream() will return a Node Readable stream. We will call Readable's pipe() to pull the data out of the stream and write it to the console.
  5. We could choose to leave the change stream open indefinitely. Instead, let's call our helper function that will set a timer and close the change stream. Add the following line beneath the existing code in monitorListingsUsingStreamAPI().

Call the Function

Now that we've implemented our function, let's call it!
  1. Inside of main(), replace your existing call to monitorListingsUsingHasNext() with a call to your new monitorListingsUsingStreamAPI():
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 60 seconds.
  4. Output similar to what we saw earlier will be displayed in your first shell where you are running changeStreams.js. If you run node changeStreamsTestData.js again before the 60 second timer has completed, you will see similar output again. After 60 seconds, the following will be displayed:

Call the Function with an Aggregation Pipeline

As we discussed earlier, sometimes you will want to use an aggregation pipeline to filter the changes in your change stream or transform the change stream event documents. Let's pass the aggregation pipeline we created in an earlier section to our new function.
  1. Update your existing call to monitorListingsUsingStreamAPI() to only leave the change stream open for 30 seconds and use the aggregation pipeline.
  2. Save your file.
  3. Run your script by executing node changeStreams.js in your shell. The change stream will open for 30 seconds.
  4. Create and update sample data by executing node changeStreamsTestData.js in a new shell. Because the change stream is using the pipeline you just created, only documents inserted into the listingsAndReviews collection that are in the Sydney, Australia market will be in the change stream. Output similar to what we saw earlier while using a change stream with an aggregation pipeline will be displayed in your first shell where you are running changeStreams.js. After 30 seconds, the following will be displayed:

Resume a Change Stream

At some point, your application will likely lose the connection to the change stream. Perhaps a network error will occur and a connection between the application and the database will be dropped. Or perhaps your application will crash and need to be restarted (but you're a 10x developer and that would never happen to you, right?).
In those cases, you may want to resume the change stream where you previously left off so you don't lose any of the change events.
Each change stream event document contains a resume token. The Node.js driver automatically stores the resume token in the _id of the change event document.
The application can pass the resume token when creating a new change stream. The change stream will include all events that happened after the event associated with the given resume token.
The MongoDB Node.js driver will automatically attempt to reestablish connections in the event of transient network errors or elections. In those cases, the driver will use its cached copy of the most recent resume token so that no change stream events are lost.
In the event of an application failure or restart, the application will need to pass the resume token when creating the change stream in order to ensure no change stream events are lost. Keep in mind that the driver will lose its cached copy of the most recent resume token when the application restarts, so your application should store the resume token.
For more information and sample code for resuming change streams, see the official documentation.

What are MongoDB Atlas Triggers?

Change streams allow you to react immediately to changes in your database. If you want to constantly be monitoring changes to your database, ensuring that your application that is monitoring the change stream is always up and not missing any events is possible... but can be challenging. This is where MongoDB Atlas triggers come in.
MongoDB supports triggers in Atlas. Atlas triggers allow you to execute functions in real time based on database events (just like change streams) or on scheduled intervals (like a cron job). Atlas triggers have a few big advantages:
  • You don't have to worry about programming the change stream. You simply program the function that will be executed when the database event is fired.
  • You don't have to worry about managing the server where your change stream code is running. Atlas takes care of the server management for you.
  • You get a handy UI to configure your trigger, which means you have less code to write.
Atlas triggers do have a few constraints. The biggest constraint I hit in the past was that functions did not support module imports (i.e. import and require). That has changed, and you can now upload external dependencies that you can use in your functions. See Upload External Dependencies for more information. To learn more about functions and their constraints, see the official Realm Functions documentation.

Create a MongoDB Atlas Trigger

Just as we did in earlier sections, let's look for new listings in the Sydney, Australia market. Instead of working locally in a code editor to create and monitor a change stream, we'll create a trigger in the Atlas web UI.

Create a Trigger

Let's create an Atlas trigger that will monitor the listingsAndReviews collection and call a function whenever a new listing is added in the Sydney, Australia market.
  1. Navigate to your project in Atlas.
  2. In the Data Storage section of the left navigation pane, click Triggers.
  3. Click Add Trigger. The Add Trigger wizard will appear.
  4. In the Link Data Source(s) selection box, select your cluster that contains the sample_airbnb database and click Link. The changes will be deployed. The deployment may take a minute or two. Scroll to the top of the page to see the status.
  5. In the Select a cluster... selection box, select your cluster that contains the sample_airbnb database.
  6. In the Select a database name... selection box, select sample_airbnb.
  7. In the Select a collection name... selection box, select listingsAndReviews.
  8. In the Operation Type section, check the box beside Insert.
  9. In the Function code box, replace the commented code with a call to log the change event. The code should now look like the following:
  10. We can create a $match statement to filter our change events just as we did earlier with the aggregation pipeline we passed to the change stream in our Node.js script. Expand the ADVANCED (OPTIONAL) section at the bottom of the page and paste the following in the Match Expression code box.
  11. Click Save. The trigger will be enabled. From that point on, the function to log the change event will be called whenever a new document in the Sydney, Australia market is inserted in the listingsAndReviews collection.

Fire the Trigger

Now that we have the trigger configured, let's create sample data that will fire the trigger.
  1. Return to the shell on your local machine.
  2. Create and update sample data by executing node changeStreamsTestData.js in a new shell.

View the Trigger Results

When you created the trigger, MongoDB Atlas automatically created a Realm application for you named Triggers_RealmApp.
The function associated with your trigger doesn't currently do much. It simply prints the change event document. Let's view the results in the logs of the Realm app associated with your trigger.
  1. Return to your browser where you are viewing your trigger in Atlas.
  2. In the navigation bar toward the top of the page, click Realm.
  3. In the Applications pane, click Triggers_RealmApp. The Triggers_RealmApp Realm application will open.
  4. In the MANAGE section of the left navigation pane, click Logs. Two entries will be displayed in the Logs pane—one for each of the listings in the Sydney, Australia market that was inserted into the collection.
  5. Click the arrow at the beginning of each row in the Logs pane to expand the log entry. Here you can see the full document that was inserted.
If you insert more listings in the Sydney, Australia market, you can refresh the Logs page to see the change events.

Wrapping Up

Today we explored four different ways to accomplish the same task of reacting immediately to changes in the database. We began by writing a Node.js script that monitored a change stream using Node.js's Built-in EventEmitter class. Next we updated the Node.js script to monitor a change stream using the MongoDB Node.js Driver's ChangeStream class. Then we updated the Node.js script to monitor a change stream using the Stream API. Finally, we created an Atlas trigger to monitor changes. In all four cases, we were able to use $match to filter the change stream events.
This post included many code snippets that built on code written in the first post of this MongoDB and Node.js Quick Start series. To get a full copy of the code used in today's post, visit the Node.js Quick Start GitHub Repo.
The examples we explored today all did relatively simple things whenever an event was fired: they logged the change events. Change streams and triggers become really powerful when you start doing more in response to change events. For example, you might want to fire alarms, send emails, place orders, update other systems, or do other amazing things.
This is the final post in the Node.js and MongoDB Quick Start Series (at least for now!). I hope you've enjoyed it! If you have ideas for other topics you'd like to see covered, let me know in the MongoDB Community.

Additional Resources


Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Article

Using AWS Rekognition to Analyse and Tag Uploaded Images


Jan 23, 2024 | 1 min read
Tutorial

How to Deploy an Application in Kubernetes With the MongoDB Atlas Operator


Aug 30, 2024 | 9 min read
Tutorial

Next Gen Web Apps with Remix and MongoDB Atlas Data API


Aug 01, 2024 | 10 min read
Article

Realm Meetup - Realm JavaScript for React Native Applications


Mar 21, 2023 | 32 min read
Table of Contents