Java - Change Streams
Rate this quickstart
- Update Java Driver to 4.2.2.
- Added Client Side Field Level Encryption example.
- Update Java Driver to 4.1.1.
Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, an application can also filter for specific changes or transform the notifications at will.
I will use the same repository as usual in this series. If you don't have a copy of it yet, you can clone it or just update it if you already have it:
In this blog post, I will be working on the file called
ChangeStreams.java, but Change Streams are super easy to work with.
For each example, you will need to start 2 Java programs in the correct order if you want to reproduce my examples.
- The first program is always the one that contains the Change Streams code.
- The second one will be one of the Java programs we already used in this Java blog posts series. You can find them in the Github repository. They will generate MongoDB operations that we will observe in the Change Streams output.
Let's start with the most simple Change Stream we can make:
As you can see, all we need is
myCollection.watch()! That's it.
This returns a
ChangeStreamIterablewhich, as indicated by its name, can be iterated to return our change events. Here, I'm iterating over my Change Stream to print my change event documents in the Java standard output.
I can also simplify this code like this:
I will reuse this functional interface in my following examples to ease the reading.
To run this example:
- Uncomment only the example 1 from the
ChangeStreams.javafile and start it in your IDE or a dedicated console using Maven in the root of your project.
MappingPOJO.javain another console or in your IDE.
In MappingPOJO, we are doing 4 MongoDB operations:
- I'm creating a new
Gradedocument with the
- I'm searching for this
Gradedocument using the
- I'm replacing entirely this
- and finally, I'm deleting this
This is confirmed in the standard output from
Let's check what we have in the standard output from
As you can see, only 3 operations appear in the Change Stream: - insert, - replace, - delete.
It was expected because the
find()operation is just a reading document from MongoDB. It's not changing anything thus not generating an event in the Change Stream.
Now that we are done with the basic example, let's explore some features of the Change Streams.
Terminate the Change Stream program we started earlier and let's move on.
Now let's do the same thing but let's imagine that we are only interested by insert and delete operations.
As you can see here, I'm using the aggregation pipeline feature of Change Streams to filter down the change events I want to process.
Uncomment the example 2 in
ChangeStreams.javaand execute the program followed by
MappingPOJO.java, just like we did earlier.
Here are the change events I'm receiving.
This time, I'm only getting 2 events
replaceevent has been filtered out compared to the first example.
Same as earlier, I'm filtering my change stream to keep only the update operations this time.
This time, follow these steps.
- uncomment the example 3 in
- if you never ran
Create.java, run it. We are going to use these new documents in the next step.
Update.javain another console.
In your change stream console, you should see 13 update events. Here is the first one:
As you can see, we are retrieving our update operation in the
updateDescriptionfield, but we are only getting the difference with the previous version of this document.
nullbecause, by default, MongoDB only sends the difference to avoid overloading the change stream with potentially useless information.
Let's see how we can change this behavior in the next example.
For this part, uncomment the example 4 from
ChangeStreams.javaand execute the programs as above.
I added the option
UPDATE_LOOKUPthis time so we can also retrieve the entire document during an update operation.
Let's see again the first update in my change stream:
Update.javaprogram updates a made-up field "comments" that doesn't exist in my POJO
Gradewhich represents the original schema for this collection. Thus the field doesn't appear in the output as it's not mapped.
If I want to see this
commentsfield, I can use a
MongoCollectionnot mapped automatically to my
Then this is what I get in my change stream:
I have shortened the
commentsfield to keep it readable but it contains 14 times the same comment in my case.
In this final example 5, I have simulated an error and I'm restarting my Change Stream from a
resumeTokenI got from a previous operation in my Change Stream.
It's important to note that a change stream will resume itself automatically in the face of an "incident". Generally, the only reason that an application needs to restart the change stream manually from a resume token is if there is an incident in the application itself rather than the change stream (e.g. an operator has decided that the application needs to be restarted).
For this final example, the same as earlier. Uncomment the part 5 (which is just calling the method above) and start
This is the output you should get:
As you can see here, I was able to stop reading my Change Stream and, from the
resumeTokenI collected earlier, I can start a new Change Stream from this point in time.
Remember to uncomment only one Change Stream example at a time.
Change Streams are very easy to use and setup in MongoDB. They are the key to any real-time processing system.
The only remaining problem here is how to get this in production correctly. Change Streams are basically an infinite loop, processing an infinite stream of events. Multiprocessing is, of course, a must-have for this kind of setup, especially if your processing time is greater than the time separating 2 events.
In the next blog post, I will show you multi-document ACID transactions in Java.