Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
Sample Data
The examples in this guide use the sample_restaurants.restaurants collection from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started guide.
Important
Project Reactor Library
This guide uses the Project Reactor library to consume Publisher instances returned by the Java Reactive Streams driver methods. To learn more about the Project Reactor library and how to use it, see Getting Started in the Reactor documentation. To learn more about how we use Project Reactor library methods in this guide, see the Write Data to MongoDB guide.
Open a Change Stream
To open a change stream, call the watch() method. The instance on which you call the method determines the scope of events that the change stream listens for. You can call the watch() method on instances of the following classes:
MongoClient: To monitor all changes in the MongoDB deploymentMongoDatabase: To monitor changes in all collections in the databaseMongoCollection: To monitor changes in the collection
The following example opens a change stream on the restaurants collection and outputs the changes as they occur:
// Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
To begin watching for changes, run the application. Then, in a separate application or shell, perform a write operation on the restaurants collection. Updating a document that has a "name" field value of "Blarney Castle" results in the following change stream output:
Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
Modify the Change Stream Output
You can pass an aggregation pipeline as a parameter to the watch() method to modify the change stream output. This parameter allows you to watch for only specified change events.
You can specify the following aggregation stages in the pipeline parameter:
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
The following example passes an aggregation pipeline to a change stream to record only update operations:
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.
Modify watch() Behavior
You can chain methods to the watch() method that represent options you can use to configure the change stream operation. If you don't specify any options, the driver does not customize the operation.
The following table describes the methods that you can chain to watch() to customize its behavior:
Option | Description |
|---|---|
| Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Directs |
| Directs |
| Directs |
| Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds. |
| Starting in MongoDB Server v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the |
| Specifies the maximum number of change events to return in each batch of the
response from the MongoDB cluster. By default, the driver sets this value to
A |
| Specifies the collation to use for the change stream cursor. |
| Attaches a comment to the operation. |
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, chain the fullDocumentBeforeChange() or the fullDocument() method to the watch() method.
The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, pass one of the following values to the fullDocumentBeforeChange() method:
FullDocumentBeforeChange.WHEN_AVAILABLE: The change event includes a pre-image of the modified document for change events only if the pre-image is available.FullDocumentBeforeChange.REQUIRED: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.
The post-image is the full version of a document after a change. To include the post-image in the change stream event, pass one of the following values to the fullDocument() method:
FullDocument.UPDATE_LOOKUP: The change event includes a copy of the entire changed document from some time after the change.FullDocument.WHEN_AVAILABLE: The change event includes a post-image of the modified document for change events only if the post-image is available.FullDocument.REQUIRED: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.
The following example opens a change stream on a collection and includes the post-image of updated documents by chaining the fullDocument() method to the watch() method:
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received including the full // document after the update ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: