Docs Menu

Docs HomeDevelop ApplicationsMongoDB Manual

Change Streams

On this page

  • Availability
  • Connect
  • Watch a Collection, Database, or Deployment
  • Change Stream Performance Considerations
  • Open A Change Stream
  • Modify Change Stream Output
  • Lookup Full Document for Update Operations
  • Resume a Change Stream
  • Use Cases
  • Access Control
  • Event Notification
  • Collation
  • Change Streams and Orphan Documents
  • Change Streams with Document Pre- and Post-Images

Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Starting in MongoDB 5.1, change streams are optimized, providing more efficient resource utilization and faster execution of some aggregation pipeline stages.

Change streams are available for replica sets and sharded clusters:

Connections for a change stream can either use DNS seed lists with the +srv connection option or by listing the servers individually in the connection string.

If the driver loses the connection to a change stream or the connection goes down, it attempts to reestablish a connection to the change stream through another node in the cluster that has a matching read preference. If the driver cannot find a node with the correct read preference, it throws an exception.

For more information, see Connection String URI Format.

You can open change streams against:

Target
Description
A collection

You can open a change stream cursor for a single collection (except system collections, or any collections in the admin, local, and config databases).

The examples on this page use the MongoDB drivers to open and work with a change stream cursor for a single collection. See also the mongosh method db.collection.watch().

A database

Starting in MongoDB 4.0, you can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.

For the MongoDB driver method, refer to your driver documentation. See also the mongosh method db.watch().

A deployment

Starting in MongoDB 4.0, you can open a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases except for admin, local, and config.

For the MongoDB driver method, refer to your driver documentation. See also the mongosh method Mongo.watch().

Note

Change Stream Examples

The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.

If the amount of active change streams opened against a database exceeds the connection pool size, you may experience notification latency. Each change stream uses a connection and a getMore operation on the change stream for the period of time that it waits for the next event. To avoid any latency issues, you should ensure that the pool size is greater than the number of opened change streams. For details see the maxPoolSize setting.

To open a change stream:

  • For a replica set, you can issue the open change stream operation from any of the data-bearing members.

  • For a sharded cluster, you must issue the open change stream operation from the mongos.

The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. [1]


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.


To retrieve the data change event from the cursor, iterate the change stream cursor. For information on the change stream event, see Change Events.

The change stream cursor remains open until one of the following occurs:

  • The cursor is explicitly closed.

  • An invalidate event occurs; for example, a collection drop or rename.

  • The connection to the MongoDB deployment closes or times out. See Cursor Behaviors for more information.

  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.

Note

The lifecycle of an unclosed cursor is language-dependent.

[1] Starting in MongoDB 4.0, you can specify a startAtOperationTime to open the cursor at a particular point in time. If the specified starting point is in the past, it must be in the time range of the oplog.

Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.


Tip

The _id field of the change stream event document act as the resume token. Do not use the pipeline to modify or remove the change stream event's _id field.

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.

See Change Events for more information on the change stream response document format.

By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.


Note

If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.

However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.

See Change Events for more information on the change stream response document format.

Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.

You can resume a change stream after a specific event by passing a resume token to resumeAfter when opening the cursor.

See Resume Tokens for more information on the resume token.

Important

  • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.

  • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.

New in version 4.2.

You can start a new change stream after a specific event by passing a resume token to startAfter when opening the cursor. Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.

See Resume Tokens for more information on the resume token.

Important

  • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.

The resume token is available from multiple sources:

Source
Description
Each change event notification includes a resume token on the _id field.

The $changeStream aggregation stage includes a resume token on the cursor.postBatchResumeToken field.

This field only appears when using the aggregate command.

The getMore command includes a resume token on the cursor.postBatchResumeToken field.

Changed in version 4.2: Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.

Tip

MongoDB provides a "snippet", an extension to mongosh, that decodes hex-encoded resume tokens.

You can install and run the resumetoken snippet from mongosh:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

You can also run resumetoken from the command line (without using mongosh) if npm is installed on your system:

npx mongodb-resumetoken-decoder <RESUME TOKEN>

See the following for more details on:

Change event notifications include a resume token on the _id field:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

When using the aggregate command, the $changeStream aggregation stage includes a resume token on the cursor.postBatchResumeToken field:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

The getMore command also includes a resume token on the cursor.postBatchResumeToken field:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.

For deployments enforcing Authentication and authorization:

  • To open a change stream against specific collection, applications must have privileges that grant changeStream and find actions on the corresponding collection.

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • To open a change stream on a single database, applications must have privileges that grant changeStream and find actions on all non-system collections in the database.

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • To open a change stream on an entire deployment, applications must have privileges that grant changeStream and find actions on all non-system collections for all databases in the deployment.

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.

For example, consider a 3-member replica set with a change stream cursor opened against the primary. If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.

If an operation is associated with a transaction, the change event document includes the txnNumber and the lsid.

Starting in MongoDB 4.2, change streams use simple binary comparisons unless an explicit collation is provided. In earlier versions, change streams opened on a single collection (db.collection.watch()) would inherit that collection's default collation.

Starting in MongoDB 5.3, during range migration, change stream events are not generated for updates to orphaned documents.

Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):

  • The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.

  • The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.

  • Enable changeStreamPreAndPostImages for a collection using db.createCollection(), create, or collMod.

Pre- and post-images are not available for a change stream event if the images were:

  • Not enabled on the collection at the time of a document update or delete operation.

  • Removed after the pre- and post-image retention time set in expireAfterSeconds.

    • The following example sets expireAfterSeconds to 100 seconds on an entire cluster:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } }
      } )
    • The following example sets expireAfterSeconds to 100 seconds on a specific collection:

      use admin
      db.getSiblingDB("my_collection")
      .sensors.watch({ changeStreamOptions:
      { preAndPostImages: { expireAfterSeconds: 100 } } })
    • The following example returns the current changeStreamOptions settings, including expireAfterSeconds:

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • Setting expireAfterSeconds to off uses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog.

    • If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the expireAfterSeconds pre- and post-image retention time.

Additional considerations:

  • Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.

  • Limit the change stream event size to less than 16 megabytes. To limit the event size, you can:

    • Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like updateDescription are not large.

    • Request only post-images in the change stream output for documents up to 16 megabytes if other change stream event fields like updateDescription are not large.

    • Request only pre-images in the change stream output for documents up to 16 megabytes if:

      • document updates affect only a small fraction of the document structure or content, and

      • do not cause a replace change event. A replace event always includes the post-image.

  • To request a pre-image, you set fullDocumentBeforeChange to required or whenAvailable in db.collection.watch(). To request a post-image, you set fullDocument using the same method.

  • Pre-images are written to the config.system.preimages collection.

    • The config.system.preimages collection may become large. To limit the collection size, you can set expireAfterSeconds time for the pre-images as shown earlier.

    • Pre-images are removed asynchronously by a background process.

Important

Backward-Incompatible Feature

Starting in MongoDB 6.0, if you are using document pre- and post-images for change streams, you must disable changeStreamPreAndPostImages for each collection using the collMod command before you can downgrade to an earlier MongoDB version.

Tip

See also:

For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.

←  Time Series Collection LimitationsChange Streams Production Recommendations →