Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Java
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
Javachevron-right

Using MongoDB Change Streams in Java

Rajesh S Nair6 min read • Published Apr 04, 2023 • Updated Aug 28, 2024
MongoDBChange StreamsJava
Facebook Icontwitter iconlinkedin icon
Rate this article
star-empty
star-empty
star-empty
star-empty
star-empty
MongoDB has come a long way from being a database engine developed at the internet company DoubleClick to now becoming this leading NoSQL data store that caters to huge clients from many domains.
With the growth of the database engine, MongoDB kept adding new features and improvements in its database product which makes it the go-to NoSQL database for new requirements and product developments.
One such feature added to the MongoDB tool kit is change streams, which was added with the MongoDB 3.6 release. Before version 3.6, keeping a tailable cursor open was used to perform similar functionality. Change streams are a feature that enables real-time streaming of data event changes on the database.
The event-driven streaming of data is a critical requirement in many use cases of product/feature developments implemented these days. Many applications developed today require that changes in data from one data source need to propagate to another source in real-time. They might also require the application to perform certain actions when a change happens in the data in the data source. Logging is one such use case where the application might need to collect, process, and transmit logs in real-time and thus would require a streaming tool or platform like change streams to implement it.

What are change streams in MongoDB?

As the word indicates, change streams are the MongoDB feature that captures "change" and "streams" it to the desired target data source.
It is an API that allows the user to subscribe their application to any change in collection, database, or even on the entire deployment. There is no middleware or data polling action to be initiated by the user to leverage this feature of event-driven, real-time data capture.
MongoDB uses replication as the underlying technology for change streams by using the operation logs generated for the data replication between replica members.
The oplog is a special capped collection that records all operations that modify the data stored in the databases. The larger the oplog, the more operations can be recorded on it. Using the oplog for change stream guarantees that the change stream will be triggered in the same order as they were applied to the database.
MongoDB Change Stream Process
As seen in the above flow, when there is a CRUD operation on the MongoDB database, the oplog captures it, and those oplog files are used by MongoDB to stream those changes into real-time applications/data receivers.

Kafka vs change streams

If we compare MongoDB and Kafka technologies, both would fall under completely separate buckets. MongoDB is classified as a NoSQL database, which can store JSON-like document structures. Kafka is an event streaming platform for real-time data feeds. It is primarily used as a publisher-subscriber model messaging service that provides a replicated message log system to stream data from one source to another.
Kafka helps to ingest huge data sets from desired data sources, filter/aggregate this data, and send it to the intended data source reliably and efficiently. Although MongoDB is a database system and its use case is miles apart from a messaging system like Kafka, the change streams feature does provide it with functionalities similar to those of Kafka.
Basically, change streams act as a messaging service to stream real-time data of any collection from your MongoDB database. It helps you to aggregate/filter that data and store it back to your same MongoDB database data source. In short, if you have a narrow use case that does not require a generalized solution but is curtailed to your data source (MongoDB), then you could go ahead with change streams as your streaming solution. Still, if you want to involve different data sources outside of MongoDB and would like a generalized solution for messaging data sets, then Kafka would make more sense.
By using change streams, you do not need a separate license or server to host your messaging service. Unlike Kafka, you would get the best of both worlds, which is a great database and an efficient messaging system.
MongoDB does provide Kafka connectors which could be used to read data in and out of Kafka topics in real-time, but if your use case is not big enough to invest in Kafka, change streams could be the perfect substitute for streaming your data.
Moreover, the Kafka connectors use change streams under the hood, so you would have to build your Kafka setup by setting up connector services and start source and sink connectors for MongoDB. In the case of change streams, you would simply watch for changes in the collection you would want without any prerequisite setup.

How Change Streams works

Change streams, once open for a collection, act as an event monitoring mechanism on your database/collection or, in some cases, documents within your database.
The core functionality lies in helping you "watch" for changes in an entity. The background work required for this mechanism of streaming changes is implemented by an already available functionality in MongoDB, which is the oplog.
Although it comes with its overheads of blocking system resources, this event monitoring for your source collection has use cases in many business-critical scenarios, like capturing log inputs of application data or monitoring inventory changes for an e-commerce webshop, and so on. So, it's important to fit the change stream with the correct use case.
As the oplog is the driver of the entire change stream mechanism, a replicated environment of at least a single node is the first prerequisite to using change streams. You will also need the following:
  • Start change stream for the collection/database intended.
  • Have the necessary CPU resources for the cluster.
Instead of setting up a self-hosted cluster for fulfilling the above checklist, there is always an option to use the cloud-based hosted solution, MongoDB Atlas. Using Atlas, you can get a ready-to-use setup with a few clicks. Since change streams are resource-intensive, the cost factor has to be kept in mind while firing an instance in Atlas for your data streaming.

Implementing change streams in your Java Spring application

In the current backend development world, streams are a burning topic as they help the developers to have a systematic pipeline in place to process the persisted data used in their application. The streaming of data helps to generate reports, have a notification mechanism for certain criteria, or, in some cases, alter some schema based on the events received through streams.
Here, I will demonstrate how to implement a change stream for a Java Spring application.
Once the prerequisite to enable change streams is completed, the steps at the database level are almost done. You will now need to choose the collection on which you want to enable change streams.
Let's consider that you have a Java Spring application for an e-commerce website, and you have a collection called e_products, which holds product information of the product being sold on the website.
To keep it simple, the fields of the collection can be:
1{"_id"  , "productName", "productDescription" , "price" , "colors" , "sizes"}
Now, these fields are populated from your collection through your Java API to show the product information on your website when a product is searched for or clicked on.
Now, say there exists another collection, vendor_products, which holds data from another source (e.g., another product vendor). In this case, it holds some of the products in your e_products but with more sizes and color options.
You want your application to be synced with the latest available size and color for each product. Change streams can help you do just that. They can be enabled on your vendor_products collection to watch for any new product inserted, and then for each of the insertion events, you could have some logic to add the colors/sizes to your e_products collection used by your application.
You could create a microservice application specifically for this use case. By using a dedicated microservice, you could allocate sufficient CPU/memory for the application to have a thread to watch on your vendor_products collection. The configuration class in your Spring application would have the following code to start the watch:
1@Async
2    public void runChangeStreamConfig() throws InterruptedException {
3        CodecRegistry pojoCodecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(),
4                fromProviders(PojoCodecProvider.builder().automatic(true).build()));
5        MongoCollection<VendorProducts> vendorCollection = mongoTemplate.getDb().withCodecRegistry(pojoCodecRegistry).getCollection("vendor_products", VendorProducts.class);
6        List<Bson> pipeline = singletonList(match(eq("operationType", "insert")));
7        oldEcomFieldsCollection.watch(pipeline).forEach(s ->
8                mergeFieldsVendorToProducts(s.getDocumentKey().get("_id").asString().getValue())
9        );
10    }
In the above code snippet, you can see how the collection is selected to be watched and that the monitored operation type is "insert." This will only check for new products added to this collection. If needed, we could also do the monitoring for "update" or "delete."
Once this is in place, whenever a new product is added to vendor_products, this method would be invoked and the _id of that product would then be passed to mergeFieldsVendorToProducts() method where you can write your logic to merge the various properties from vendor_products to the e_products collection.
1forEach(s ->
2        {
3            Query query = new Query();
4            query.addCriteria(Criteria.where("_id").is(s.get("_id")));
5            Update update = new Update();
6            update.set(field, s.get(field));
7            mongoTemplate.updateFirst(query, update, EProducts.class);
8        })
This is a small use case for change streams; there are many such examples where change streams can come in handy. It's about using this tool for the right use case.

Conclusion

In conclusion, change streams in MongoDB provide a powerful and flexible way to monitor changes to your database in real time. Whether you need to react to changes as they happen, synchronize data across multiple systems, or build custom event-driven workflows, change streams can help you achieve these goals with ease.
By leveraging the power of change streams, you can improve the responsiveness and efficiency of your applications, reduce the risk of data inconsistencies, and gain deeper insights into the behavior of your database.
While there is a bit of a learning curve when working with change streams, MongoDB provides comprehensive documentation and a range of examples to help you get started. With a little practice, you can take advantage of the full potential of change streams and build more robust, scalable, and resilient applications.

Facebook Icontwitter iconlinkedin icon
Rate this article
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

How to Migrate PostgreSQL to MongoDB With Confluent Kafka


Aug 30, 2024 | 10 min read
News & Announcements

The 2022 MongoDB Java Developer Survey


Apr 02, 2024 | 0 min read
Tutorial

Kafka to MongoDB Atlas End to End Tutorial


Jun 07, 2023 | 6 min read
Tutorial

Using AWS IAM Authentication with the MongoDB Connector for Apache Kafka


Jul 01, 2024 | 4 min read
Table of Contents