Docs Menu

Docs HomeDevelop ApplicationsMongoDB DriversJava Sync Driver

Open Change Streams

On this page

  • Overview
  • Open a Change Stream
  • Apply Aggregation Operators to your Change Stream
  • Split Large Change Stream Events
  • Include Pre-images and Post-images

In this guide, you can learn how to use a change stream to monitor real-time changes to your database. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a single collection, database, or deployment.

You can specify a set of aggregation operators to filter and transform the data your application receives. When connecting to a MongoDB deployment v6.0 or later, you can also configure the events to include the document data before and after the change.

Learn how to open and configure your change streams in the following sections:

  • Open a Change Stream

  • Apply Aggregation Operators to your Change Stream

  • Include Pre-images and Post-images

You can open a change stream to subscribe to specific types of data changes and produce change events in your application.

To open a change stream, call the watch() method on an instance of a MongoCollection, MongoDatabase, or MongoClient.

Important

Standalone MongoDB deployments don't support change streams because the feature requires a replica set oplog. To learn more about the oplog, see the Replica Set Oplog MongoDB Server manual page.

The object on which you call the watch() method on determines the scope of events that the change stream listens for.

If you call watch() on a MongoCollection, the change stream monitors a collection.

If you call watch() on a MongoDatabase, the change stream monitors all collections in that database.

If you call watch() on a MongoClient, the change stream monitors all changes in the connected MongoDB deployment.

This example shows how to open a change stream on the myColl collection and print change stream events as they occur.

The driver stores change stream events in a variable of type ChangeStreamIterable. In the following example, we specify that the driver should populate the ChangeStreamIterable object with Document types. As a result, the driver stores individual change stream events as ChangeStreamDocument objects.

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

An insert operation on the collection produces the following output:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

For a runnable example, see the Watch for Changes usage example page.

To learn more about the watch() method, see the following API documentation:

You can pass an aggregation pipeline as a parameter to the watch() method to specify which change events the change stream receives.

To learn which aggregation operators your MongoDB Server version supports, see Modify Change Stream Output.

The following code example shows how you can apply an aggregation pipeline to configure your change stream to receive change events for only insert and update operations:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

An update operation on the collection produces the following output:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

Starting in MongoDB 7.0, you can use the $changeStreamSplitLargeEvent aggregation stage to split events that exceed 16 MB into smaller fragments.

Use $changeStreamSplitLargeEvent only when strictly necessary. For example, use $changeStreamSplitLargeEvent if your application requires full document pre- or post-images, and generates events that exceed 16 MB.

The $changeStreamSplitLargeEvent stage returns the fragments sequentially. You can access the fragments by using a change stream cursor. Each fragment includes a SplitEvent object containing the following fields:

Field
Description
fragment
The index of the fragment, starting at 1
of
The total number of fragments that compose the split event

The following example modifies your change stream by using the $changeStreamSplitLargeEvent aggregation stage to split large events:

ChangeStreamIterable<Document> changeStream = collection.watch(
Arrays.asList(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

Note

You can have only one $changeStreamSplitLargeEvent stage in your aggregation pipeline, and it must be the last stage in the pipeline.

You can call the getSplitEvent() method on your change stream cursor to access the SplitEvent as shown in the following example:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

For more information about the $changeStreamSplitLargeEvent aggregation stage, see the $changeStreamSplitLargeEvent server documentation.

You can configure the change event to contain or omit the following data:

  • The pre-image, a document that represents the version of the document before the operation, if it exists

  • The post-image, a document that represents the version of the document after the operation, if it exists

Important

You can enable pre- and post-images on collections only if your deployment uses MongoDB v6.0 or later.

To receive change stream events that include a pre-image or post-image, you must perform the following actions:

  • Enable pre-images and post-images for the collection on your MongoDB deployment.

    Tip

    To learn how to enable pre- and post-images on your deployment, see Change Streams with Document Pre- and Post-Images in the Server manual.

    To learn how to instruct the driver to create a collection with pre-images and post-images enabled, see the Create a Collection with Pre-Image and Post-Images Enabled section.

  • Configure your change stream to retrieve either or both the pre-images and post-images.

    Tip

    To configure your change stream to record the pre-image in change events, see the Pre-image Configuration Example.

    To configure your change stream to record the post-image in change events, see the Post-image Configuration Example.

To use the driver to create a collection with the pre-image and post-image options enabled, specify an instance of ChangeStreamPreAndPostImagesOptions and call the createCollection() method as shown in the following example:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

You can change the pre-image and post-image option in an existing collection by running the collMod command from the MongoDB Shell. To learn how to perform this operation, see the entry on collMod in the Server manual.

Warning

If you enabled pre-images or post-images on a collection, modifying these settings with collMod can cause existing change streams on that collection to fail.

The following code example shows how you can configure a change stream on the myColl collection to include the pre-image and output any change events:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

The preceding example configures the change stream to use the FullDocumentBeforeChange.REQUIRED option. This option configures the change stream to require pre-images for replace, update, and delete change events. If the pre-image is not available, the driver raises an error.

Suppose you update the value of the amount field in a document from 150 to 2000. This change event produces the following output:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

For a list of options, see the FullDocumentBeforeChange API documentation.

The following code example shows how you can configure a change stream on the myColl collection to include the pre-image and output any change events:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

The preceding example configures the change stream to use the FullDocument.WHEN_AVAILABLE option. This option configures the change stream to return the post-image of the modified document for replace and update change events, if it's available.

Suppose you update the value of the color field in a document from "purple" to "pink". The change event produces the following output:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

For a list of options, see the FullDocument API documentation.

←  Access Data From a CursorSort Results →