Docs Menu

Docs HomeDevelop ApplicationsMongoDB DriversJava

Watch for Changes

You can keep track of changes to data in MongoDB, such as changes to a collection, database, or deployment, by opening a change stream. A change stream allows applications to watch for changes to data and react to them. The change stream returns change event documents when changes occur. You can open a change stream by calling the watch() method on a MongoCollection, MongoDatabase, or MongoClient object:

ChangeStreamIterable<Document> changeStream = database.watch();

The watch() method optionally takes an aggregation pipeline which consists of an array of stages as the first parameter to filter and transform the change event output as follows:

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

The watch() method returns an instance of ChangeStreamIterable, a class that offers several methods to access, organize, and traverse the results. ChangeStreamIterable also inherits methods from its parent class, MongoIterable which implements the core Java interface Iterable.

You can call forEach() on the ChangeStreamIterable to handle events as they occur, or you can use the iterator() method which returns a MongoCursor instance that you can use to traverse the results.

You can call methods on the MongoCursor such as hasNext() to check whether additional results exist, next() to return the next document in the collection, or tryNext(), to immediately return either the next available element in the change stream or null. Unlike the MongoCursor returned by other queries, a MongoCursor associated with a change stream waits until a change event arrives before returning a result from next(). As a result, calls to next() using a change stream's MongoCursor never throw a java.util.NoSuchElementException.

To configure options for processing the documents returned from the change stream, use member methods of the ChangeStreamIterable object returned by watch(). See the link to the ChangeStreamIterable API documentation at the bottom of this example for more details on the available methods.

To capture events from a change stream, call the forEach() method with a callback function as shown below:

changeStream.forEach(event -> System.out.println("Change observed: " + event));

The callback function triggers when a change event is emitted. You can specify logic in the callback to process the event document when it is received.

Important

forEach() blocks the current thread

Calls to forEach() block the current thread as long as the corresponding change stream listens for events. If your program needs to continue executing other logic, such as processing requests or responding to user input, consider creating and listening to your change stream in a separate thread.

Note

For update operation change events, change streams only return the modified fields by default rather than the entire updated document. You can configure your change stream to also return the most current version of the document by calling the fullDocument() member method of the ChangeStreamIterable object with the value FullDocument.UPDATE_LOOKUP as follows:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

The following example uses two separate applications to demonstrate how to listen for changes using a change stream:

  • The first application, named Watch, opens a change stream on the movies collection in the sample_mflix database. Watch uses an aggregation pipeline to filter changes based on operationType so that it only receives insert and update events (deletes are excluded by omission). Watch uses a callback to receive and print the filtered change events that occur on the collection.

  • The second application, named WatchCompanion, inserts a single document into the movies collection in the sample_mflix database. Next, WatchCompanion updates the document with a new field value. Finally, WatchCompanion deletes the document.

First, run Watch to open the change stream on the collection and define a callback on the change stream using the forEach() method. While Watch is running, run WatchCompanion to generate change events by performing changes to the collection.

Note

This example connects to an instance of MongoDB using a connection URI. To learn more about connecting to your MongoDB instance, see the connection guide.

Watch:

package usage.examples;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// variables referenced in a lambda must be final; final array gives us a mutable integer
final int[] numberOfEvents = {0};
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

If you run the preceding applications in sequence, you should see output from the Watch application that is similar to the following. Only the insert and update operations are printed, since the aggregation pipeline filters out the delete operation:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825EC..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825EC..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null
}

You should also see output from the WatchCompanion application that is similar to the following:

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Tip

Legacy API

If you are using the legacy API, see our FAQ page to learn what changes you need to make to this code example.

For additional information on the classes and methods mentioned on this page, see the following resources:

←  Perform Bulk OperationsCount Documents →