db.watch()
On this page
Definition
db.watch( pipeline, options )
For replica sets and sharded clusters only
Opens a change stream cursor for a database to report on all its non-
system
collections.ParameterTypeDescriptionpipeline
arrayOptional. An Aggregation Pipeline consisting of one or more of the following aggregation stages:
Specify a pipeline to filter/modify the change events output.
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.
options
documentOptional. Additional options that modify the behavior ofdb.watch()
.The
options
document can contain the following fields and values:FieldTypeDescriptionresumeAfter
documentOptional. Specifies a resume token as the logical starting point for the change stream. Cannot be used to resume the change stream after an
invalidate
event.resumeAfter
is mutually exclusive withstartAfter
andstartAtOperationTime
.startAfter
documentOptional. Specifies a resume token as the logical starting point for the change stream. Unlike
resumeAfter
,startAfter
can resume notifications after aninvalidate
event by creating a new change stream.startAfter
is mutually exclusive withresumeAfter
andstartAtOperationTime
.fullDocument
stringOptional. By default,
db.watch()
returns the delta of those fields modified by an update operation, instead of the entire updated document.Set
fullDocument
to"updateLookup"
to directdb.watch()
to look up the most current majority-committed version of the updated document.db.watch()
returns afullDocument
field with the document lookup in addition to theupdateDescription
delta.Starting in MongoDB 6.0, you can set
fullDocument
to:"whenAvailable"
to output the document post-image, if available, after the document was inserted, replaced, or updated."required"
to output the document post-image after the document was inserted, replaced, or updated. Raises an error if the post-image is not available.
fullDocumentBeforeChange
stringOptional. Default is
"off"
.Starting in MongoDB 6.0, you can use the new
fullDocumentBeforeChange
field and set it to:"whenAvailable"
to output the document pre-image, if available, before the document was replaced, updated, or deleted."required"
to output the document pre-image before the document was replaced, updated, or deleted. Raises an error if the pre-image is not available."off"
to suppress the document pre-image."off"
is the default.
batchSize
intOptional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.
Has the same functionality as
cursor.batchSize()
.maxAwaitTimeMS
intOptional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.
Defaults to
1000
milliseconds.collation
documentOptional. Pass a collation document to specify a collation for the change stream cursor.
If omitted, defaults to
simple
binary comparison.startAtOperationTime
TimestampOptional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see
rs.printReplicationInfo()
.startAtOperationTime
is mutually exclusive withresumeAfter
andstartAfter
.Returns: A cursor over the change event documents. See Change Events for examples of change event documents.
Availability
Deployment
db.watch()
is available for replica sets and sharded
clusters:
For a replica set, you can issue
db.watch()
on any data-bearing member.For a sharded cluster, you must issue
db.watch()
on amongos
instance.
Storage Engine
You can only use db.watch()
with the Wired Tiger
storage engine.
Read Concern majority
Support
Change streams are
available regardless of the "majority"
read concern
support; that is, read concern majority
support can be either
enabled (default) or disabled
to use change streams.
Behavior
You cannot run
db.watch()
on theadmin
,local
, orconfig
database.db.watch()
only notifies on data changes that have persisted to a majority of data-bearing members.The change stream cursor remains open until one of the following occurs:
The cursor is explicitly closed.
An invalidate event occurs; for example, a collection drop or rename.
The connection to the MongoDB deployment closes or times out. See Cursor Behaviors for more information.
If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
You can run
db.watch()
for a database that does not exist. However, once the database is created and you drop the database, the change stream cursor closes.
Resumability
Unlike the MongoDB Drivers, mongosh
does
not automatically attempt to resume a change stream cursor after an
error. The MongoDB drivers make one attempt to automatically resume
a change stream cursor after certain errors.
db.watch()
uses information stored in the oplog to produce the
change event description and generate a resume token associated to
that operation. If the operation identified by the resume token
passed to the resumeAfter
or startAfter
option has already
dropped off the oplog, db.watch()
cannot resume the
change stream.
See Resume a Change Stream for more information on resuming a change stream.
Note
You cannot use
resumeAfter
to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Instead, you can use startAfter to start a new change stream after an invalidate event.If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
Note
You cannot use resumeAfter
to resume a change stream after an
invalidate event (for example, a collection
drop or rename) closes the stream. Instead, you can use
startAfter to start a new change
stream after an invalidate event.
Full Document Lookup of Update Operations
By default, the change stream cursor returns specific field changes/deltas for update operations. You can also configure the change stream to look up and return the current majority-committed version of the changed document. Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.
Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.
Access Control
When running with access control, the user must have the
find
and changeStream
privilege actions on
the database resource. That is, a user must
have a role that grants the following privilege:
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream"] }
The built-in read
role provides the appropriate
privileges.
Cursor Iteration
MongoDB provides multiple ways to iterate on a cursor.
The cursor.hasNext()
method blocks and waits for the next
event. To monitor the watchCursor
cursor and iterate over the
events, use hasNext()
like this:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
The cursor.tryNext()
method is non-blocking. To monitor
the watchCursor
cursor and iterate over the events, use
tryNext()
like this:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Example
The following operation in mongosh
opens a
change stream cursor on the hr
database. The returned cursor
reports on data changes to all the non-system
collections in that
database.
watchCursor = db.getSiblingDB("hr").watch()
Iterate the cursor to check for new events. Use the
cursor.isClosed()
method with the cursor.tryNext()
method to ensure the loop only exits if the change stream cursor is
closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
For complete documentation on change stream output, see Change Events.
Note
You cannot use isExhausted()
with change streams.