Definition
db.collection.watch( pipeline, options )Important
mongosh Method
This page documents a
mongoshmethod. This is not the documentation for database commands or language-specific drivers, such as Node.js.For the database command, see the
aggregatecommand with the$changeStreamaggregation stage.For MongoDB API drivers, refer to the language-specific MongoDB driver documentation.
For replica sets and sharded clusters only
Opens a change stream cursor on the collection.
ParameterTypeDescriptionpipelinearray
Optional. An Aggregation Pipeline consisting of one or more of the following aggregation stages:
Specify a pipeline to filter/modify the change events output.
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.
optionsdocument
Optional. Additional options that modify the behavior of
watch().The
optionsdocument can contain the following fields and values:FieldTypeDescriptionresumeAfterdocument
Optional. Directs
watch()to attempt resuming notifications starting after the operation specified in the resume token.Each change stream event document includes a resume token as the
_idfield. Pass the entire_idfield of the change event document that represents the operation you want to resume after.resumeAfteris mutually exclusive withstartAfterandstartAtOperationTime.startAfterdocument
Optional. Directs
watch()to attempt starting a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.Each change stream event document includes a resume token as the
_idfield. Pass the entire_idfield of the change event document that represents the operation you want to resume after.startAfteris mutually exclusive withresumeAfterandstartAtOperationTime.fullDocumentstring
Optional. By default,
watch()returns the delta of those fields modified by an update operation, instead of the entire updated document.Set
fullDocumentto"updateLookup"to directwatch()to look up the most current majority-committed version of the updated document.watch()returns afullDocumentfield with the document lookup in addition to theupdateDescriptiondelta.Starting in MongoDB 6.0, you can set
fullDocumentto:"whenAvailable"to output the document post-image, if available, after the document was inserted, replaced, or updated."required"to output the document post-image after the document was inserted, replaced, or updated. Raises an error if the post-image is not available.
fullDocumentBeforeChangestring
Optional.
Starting in MongoDB 6.0, you can use the new
fullDocumentBeforeChangefield and set it to:"whenAvailable"to output the document pre-image, if available, before the document was replaced, updated, or deleted."required"to output the document pre-image before the document was replaced, updated, or deleted. Raises an error if the pre-image is not available."off"to suppress the document pre-image."off"is the default.
batchSizeint
Optional. The maximum number of documents that can be returned in each batch of a change stream. By default,
watch()has an initial batch size of the lesser of101documents or 16 mebibytes (MiB) worth of documents. Subsequent batches have a maximum size of 16 mebibytes.This option can enforce a smaller limit than 16 MiB, but not a larger one. When set, thebatchSizeis the lesser ofbatchSizedocuments or 16 MiB worth of documents.Has the same functionality as
cursor.batchSize().maxAwaitTimeMSint
Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.
Defaults to
1000milliseconds.collationdocument
Optional. Pass a collation document to specify a collation for the change stream cursor.
Defaults to
simplebinary comparison if omitted.showExpandedEventsboolean
Optional. Starting in MongoDB 6.0, change streams support change notifications for DDL events, like the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor using the
showExpandedEventsoption.New in version 6.0.
startAtOperationTimeTimestamp
Optional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see
rs.printReplicationInfo().startAtOperationTimeis mutually exclusive withresumeAfterandstartAfter.Returns: A cursor that remains open as long as a connection to the MongoDB deployment remains open and the collection exists. See Change Events for examples of change event documents. Tip
Compatibility
This method is available in deployments hosted in the following environments:
MongoDB Atlas: The fully managed service for MongoDB deployments in the cloud
Note
This command is supported in all MongoDB Atlas clusters. For information on Atlas support for all commands, see Unsupported Commands.
MongoDB Enterprise: The subscription-based, self-managed version of MongoDB
MongoDB Community: The source-available, free-to-use, and self-managed version of MongoDB
Availability
Deployment
db.collection.watch() is available for replica set and
sharded cluster deployments :
For a replica set, you can issue
db.collection.watch()on any data-bearing member.For a sharded cluster, you must issue
db.collection.watch()on amongosinstance.
Storage Engine
You can only use db.collection.watch() with the Wired
Tiger storage engine.
Read Concern majority Support
Change streams are
available regardless of the "majority" read concern
support; that is, read concern majority support can be either
enabled (default) or disabled
to use change streams.
Behavior
db.collection.watch()only notifies on data changes that have persisted to a majority of data-bearing members.The change stream cursor remains open until one of the following occurs:
The cursor is explicitly closed.
An invalidate event occurs; for example, a collection drop or rename.
The connection to the MongoDB deployment closes or times out. See Behavior for more information.
If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.
Resumability
Unlike the MongoDB Drivers, mongosh does
not automatically attempt to resume a change stream cursor after an
error. The MongoDB drivers make one attempt to automatically resume
a change stream cursor after certain errors.
db.collection.watch() uses information stored in the oplog to produce the
change event description and generate a resume token associated to
that operation. If the operation identified by the resume token
passed to the resumeAfter or startAfter option has already
dropped off the oplog, db.collection.watch() cannot resume the
change stream.
See Resume a Change Stream for more information on resuming a change stream.
Note
You cannot use
resumeAfterto resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Instead, you can use startAfter to start a new change stream after an invalidate event.If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.
Note
You cannot use resumeAfter to resume a change stream after an
invalidate event (for example, a collection
drop or rename) closes the stream. Instead, you can use
startAfter to start a new change
stream after an invalidate event.
Full Document Lookup of Update Operations
By default, the change stream cursor returns specific field changes/deltas for update operations. You can also configure the change stream to look up and return the current majority-committed version of the changed document. Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.
Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.
Access Control
When running with access control, the user must have the
find and changeStream privilege actions on
the collection resource. That is, a user must
have a role that grants the following privilege:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
The built-in read role provides the appropriate
privileges.
Cursor Iteration
MongoDB provides multiple ways to iterate on a cursor.
The cursor.hasNext() method blocks and waits for the next
event. To monitor the watchCursor cursor and iterate over the
events, use hasNext() like this:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
The cursor.tryNext() method is non-blocking. To monitor
the watchCursor cursor and iterate over the events, use
tryNext() like this:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Examples
Open a Change Stream
The following operation opens a change stream cursor against the
data.sensors collection:
watchCursor = db.getSiblingDB("data").sensors.watch()
Iterate the cursor to check for new events. Use the
cursor.isClosed() method with the cursor.tryNext()
method to ensure the loop only exits if the change stream cursor is
closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
For complete documentation on change stream output, see Change Events.
Note
You cannot use isExhausted() with change streams.
Change Stream with Full Document Update Lookup
Set the fullDocument option to "updateLookup" to direct the
change stream cursor to lookup the most current majority-committed
version of the document associated to an update change stream event.
The following operation opens a change stream cursor against
the data.sensors collection using the
fullDocument : "updateLookup" option.
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
Iterate the cursor to check for new events. Use the
cursor.isClosed() method with the cursor.tryNext()
method to ensure the loop only exits if the change stream cursor is
closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
For any update operation, the change event returns the result of the
document lookup in the fullDocument field.
For an example of the full document update output, see change stream update event.
For complete documentation on change stream output, see Change Events.
Change Streams with Document Pre- and Post-Images
Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):
The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.
The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.
Enable
changeStreamPreAndPostImagesfor a collection usingdb.createCollection(),create, orcollMod. For example, when using thecollModcommand:db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: true } } )
Pre- and post-images are not available for a change stream event if the images were:
Not enabled on the collection at the time of a document update or delete operation.
Removed after the pre- and post-image retention time set in
expireAfterSeconds.The following example sets
expireAfterSecondsto100seconds on an entire cluster:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) Note
The
setClusterParametercommand is not supported in MongoDB Atlas clusters. For information on Atlas support for all commands, see Unsupported Commands in Atlas.The following example returns the current
changeStreamOptionssettings, includingexpireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Setting
expireAfterSecondstooffuses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog.If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the
expireAfterSecondspre- and post-image retention time.
Additional considerations:
Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.
Limit the change stream event size to less than 16 mebibytes. To limit the event size, you can:
Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like
updateDescriptionare not large.Request only post-images in the change stream output for documents up to 16 mebibytes if other change stream event fields like
updateDescriptionare not large.Request only pre-images in the change stream output for documents up to 16 mebibytes if:
document updates affect only a small fraction of the document structure or content, and
do not cause a
replacechange event. Areplaceevent always includes the post-image.
To request a pre-image, you set
fullDocumentBeforeChangetorequiredorwhenAvailableindb.collection.watch(). To request a post-image, you setfullDocumentusing the same method.Pre-images are written to the
config.system.preimagescollection.The
config.system.preimagescollection may become large. To limit the collection size, you can setexpireAfterSecondstime for the pre-images as shown earlier.Pre-images are removed asynchronously by a background process.
Important
Backward-Incompatible Feature
Starting in MongoDB 6.0, if you are using document pre- and post-images
for change streams, you must disable
changeStreamPreAndPostImages for each collection using
the collMod command before you can downgrade to an earlier
MongoDB version.
Tip
For change stream events and output, see Change Events.
To watch a collection for changes, see
db.collection.watch().For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.
Create Collection
Create a temperatureSensor collection that has
changeStreamPreAndPostImages enabled:
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
Populate the temperatureSensor collection with temperature readings:
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
The following sections show change stream examples for document pre- and
post-images that use the temperatureSensor collection.
Change Stream with Document Pre-Image
You use the fullDocumentBeforeChange: "whenAvailable" setting to
output the document pre-image, if available. The pre-image is the
document before it was replaced, updated, or deleted. There is no
pre-image for an inserted document.
The following example creates a change stream cursor for the
temperatureSensor collection using fullDocumentBeforeChange:
"whenAvailable":
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
The following example uses the cursor to check for new change stream events:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
In the example:
The
whileloop runs until the cursor is closed.hasNext()returnstrueif the cursor has documents.
The following example updates the reading field for a
temperatureSensor document:
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
After the temperatureSensor document is updated, the change event
outputs the document pre-image in the fullDocumentBeforeChange
field. The pre-image contains the temperatureSensor document
reading field before it was updated. For example:
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
Tip
For document update output details, see change stream update events.
For change stream output details, see Change Events.
Change Stream with Document Post-Image
You use the fullDocument: "whenAvailable" setting to output the
document post-image, if available. The post-image is the document after
it was inserted, replaced, or updated. There is no post-image for a
deleted document.
The following example creates a change stream cursor for the
temperatureSensor collection using fullDocument:
"whenAvailable":
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
The following example uses the cursor to check for new change stream events:
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
In the example:
The
whileloop runs until the cursor is closed.hasNext()returnstrueif the cursor has documents.
The following example updates the reading field for a
temperatureSensor document:
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
After the temperatureSensor document is updated, the change event
outputs the document post-image in the fullDocument field. The
post-image contains the temperatureSensor document reading field
after it was updated. For example:
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
For document update output details, see change stream update events.
For change stream output details, see Change Events.
Change Stream with Aggregation Pipeline Filter
Note
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.
The following operation opens a change stream cursor against the
data.sensors collection using an aggregation pipeline to
filter only insert events:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
Iterate the cursor to check for new events. Use the
cursor.isClosed() method with the cursor.hasNext()
method to ensure the loop only exits if the change stream cursor is
closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
The change stream cursor only returns change events where the
operationType is insert. For complete documentation on
change stream output, see Change Events.
Resuming a Change Stream
Every document returned by a change stream cursor includes a resume
token as the _id field. To resume a change stream, pass the entire
_id document of the change event you want to resume from to
either the resumeAfter or startAfter option of
watch().
The following operation resumes a change stream cursor against the
data.sensors collection using a resume token. This
assumes that the operation that generated the resume token has not
rolled off the cluster's oplog.
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
Iterate the cursor to check for new events. Use the
cursor.isClosed() method with the cursor.hasNext()
method to ensure the loop only exits if the change stream cursor is
closed and there are no objects remaining in the latest batch:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
See Resume a Change Stream for complete documentation on resuming a change stream.