Coming in MongoDB 4.2 - On-Demand Materialized Views

In the previous "Coming In", we looked at some of the enhancements in the aggregation framework, but we missed one because it's a really big enhancement. It's a new stage for the pipelines called $merge and what it delivers is the ability to create collections based on an aggregation and update those created collections efficiently.

How Are On-Demand Materialized Views New?

Now, creating collections based on an aggregation may sound like $out, a stage that has been in the aggregation framework since way back in MongoDB 2.6. The $out stage can take the results of an aggregation and put it into a new collection, replacing the collections entire contents with the new results. Useful though this process is, it can consume a lot of CPU and IO, regenerating the entire collection every time. At least $out does this atomically, building a temporary collection and only swapping it in when the aggregation pipeline had completed its work.

What if we could just update the generated results collection rather than rebuild it completely? That's where 4.2's $merge comes in. It lets you incrementally update that results collection every time it is run. It's a command with a lot of options and a lot of power, so let's look at a worked example.

Say we have an AirBnB dataset from the MongoDB Atlas sample data, full of properties around the world, each with the number of beds available in each property. And say we need to get the total number of beds per country.

Well, aggregation makes that easy.

> db.listingsAndReviews.aggregate([ { $group: { _id: "$address.country", bedcount: { $sum: "$beds" } } } ] )
{ "_id" : "Spain", "bedcount" : 1477 }
{ "_id" : "United States", "bedcount" : 2347 }
{ "_id" : "Portugal", "bedcount" : 1332 }
{ "_id" : "Turkey", "bedcount" : 1294 }
{ "_id" : "Brazil", "bedcount" : 1518 }
{ "_id" : "Australia", "bedcount" : 1225 }
{ "_id" : "Hong Kong", "bedcount" : 1042 }
{ "_id" : "Canada", "bedcount" : 1213 }
{ "_id" : "China", "bedcount" : 29 }
>

Great, but we don't want to run this aggregation every time someone needs the data. Let's use $merge to write the results out to another collection. It's as simple as adding

{ $merge: { into: "bedcount" } }

as the last stage of the pipeline, like so:

> db.listingsAndReviews.aggregate([ { $group: { _id: "$address.country", bedcount: { $sum: "$beds" } } }, { $merge: { into: "bedcount" } } ] )
MongoDB Enterprise Exploratorium-shard-0:PRIMARY> db.bedcount.find()
{ "_id" : "Spain", "bedcount" : 1477 }
{ "_id" : "United States", "bedcount" : 2347 }
{ "_id" : "Portugal", "bedcount" : 1332 }
{ "_id" : "Turkey", "bedcount" : 1294 }
{ "_id" : "Brazil", "bedcount" : 1518 }
{ "_id" : "Australia", "bedcount" : 1225 }
{ "_id" : "Hong Kong", "bedcount" : 1042 }
{ "_id" : "Canada", "bedcount" : 1213 }
{ "_id" : "China", "bedcount" : 29 }
>

Now, developers can refer to this collection for their results and not have to run the aggregation every time. And to update it, all we need to do is to run the aggregation again and it'll update all the values in place.

It's not just writing the entire result set out though, It uses the unique _id of the results to match with the existing results in the collection. Actually, using the _id is just the default. Using the on property, you can set it to any field with unique values.

Anyway, if the _id is matched, then by default the $merge stage takes the new result document and the result document in the collection and merges the two to produce a composite document of all their fields. If it isn't matched, then the new result document is inserted. This behavior is controlled by two properties of $merge that are called whenMatched and whenNotMatched. By default, whenMatched is set to "merge" and merges the two documents together. It can be set to "replace" and just completely swap in the new document, it can leave things untouched with "keepExisting", or it can error out of the aggregation with a "fail".

Pipeline Powered Updates In $merge

If that's not enough for you, then there's also the ability to use pipeline updates to perform more complex update operations. Take our beds example. Say you've been asked to add a when field which has the time when the value last changed. With $merge, that's now doable without leaving the aggregation. We'll make the whenMatched value a pipeline with a $set stage.

> db.listingsAndReviews.aggregate([
 { $group: { _id: "$address.country", bedcount: { $sum: "$beds" } } },
 {
   $merge: {
     into: "bedcount",
     whenMatched: [
       {
         $set: {

Now we get need to define what gets set to which value. First up is the bedcount.

           bedcount: "$$new.bedcount",

The $$new notation says "take this value from the new, just calculated doc". So we're going to copy over the new bedcount. Now, we need to set our when field. We can do this with a conditional operator. If the bedcount written in the materialized view is the same as the incoming bedcount, we'll leave the value alone, copying over the old $last to the record, If it isn't the same, we'll use the value $$NOW, which as we have previously mentioned now returns the current time and date. That all looks like this:

           last: {
             $cond: {
               if: { $ne: ["$bedcount", "$$new.bedcount"] },
               then: "$$NOW",
               else: "$last"
             }
           }
         }
       }
     ]
   }
 }
]);

And when we run it for the first time and check results we get:

> db.bedcount.find()
{ "_id" : "Canada", "bedcount" : 1213, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "United States", "bedcount" : 2347, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Spain", "bedcount" : 1477, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Turkey", "bedcount" : 1294, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Portugal", "bedcount" : 1332, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "China", "bedcount" : 29, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Australia", "bedcount" : 1225, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Brazil", "bedcount" : 1518, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Hong Kong", "bedcount" : 1042, "last" : ISODate("2019-08-01T12:50:36.207Z") }

Pop into the database, and add a couple of beds to a property in Spain and rerun the aggregation:

> db.listingsAndReviews.aggregate([ { $group: { _id: "$address.country", bedcount: { $sum: "$beds" } } } , { $merge: { into: "bedcount", whenMatched: [ { $set: { bedcount:"$$new.bedcount", when: { $cond: { if: { $ne: [ "$bedcount", "$$new.bedcount" ] }, then:"$$NOW", else: "$last" } } } } ] } } ] )
MongoDB Enterprise Exploratorium-shard-0:PRIMARY> db.bedcount.find()
{ "_id" : "Canada", "bedcount" : 1213, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "United States", "bedcount" : 2347, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Spain", "bedcount" : 1481, "last" : ISODate("2019-08-01T12:53:18.579Z") }
{ "_id" : "Turkey", "bedcount" : 1294, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Portugal", "bedcount" : 1332, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "China", "bedcount" : 29, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Australia", "bedcount" : 1225, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Brazil", "bedcount" : 1518, "last" : ISODate("2019-08-01T12:50:36.207Z") }
{ "_id" : "Hong Kong", "bedcount" : 1042, "last" : ISODate("2019-08-01T12:50:36.207Z") }
>

And as you can see Spain has four more beds and an updated timestamp.

We aren't setting the whenNotMatched property here; we don't need to as the "insert" default does what we want, but we do still have the option to "discard" the new document or "fail" if we want.

Optimizing Updates

Of course, we have a fairly static dataset here and the source of our updates is still from running an aggregation over all the documents in the collection. How you aggregate your data can boost your performance. Say we wanted to run a collection of top-rated 99+ properties, we could scan the entire listings collection every time and generate our new collection. But we wouldn't want to do that regularly. It just so happens that the dataset has a last_scraped date and time in the fields. So if we initialize our new collection with epoch starting date, we'll process and update all the records:

> db.listingsAndReviews.aggregate([ { $match: { "last_scraped": { $gt:ISODate("1970-01-01T00:00:00Z") }, "review_scores.review_scores_rating": { $gte: 99 } } }, { $merge: { into:"recentTopRates" } } ] )

Now, recentTopRates contains all the highly rated properties. In the natural course of things, the listingsAndReviews collection would be updated with newly scraped date. At any point, we can rapidly update the recentTopRates collection by re-running the aggregation with the date we last updated it.

> db.listingsAndReviews.aggregate([ { $match: { "last_scraped": { $gt:ISODate("2019-31-07T00:00:00Z") }, "review_scores.review_scores_rating": { $gte: 99 } } }, { $merge: { into:"recentTopRates" } } ] )

Now, only the documents scraped since that date are checked for a high rating and only the ones that pass that are sent into the $merge stage to update the recentTopRates collection. That's way faster than regenerating the collection with $out or an unfiltered $merge. Of course, if something stops being top rated, it will persist in the collection, but that can be handled with a TTL index that expires entries or an aggregation which works out which properties have left the list and need to be removed.

Beyond Materialized Views

This is, of course, an example to give you a feel for how you can create materialized views on demand and the kind of flexibility you have to customize the process. As it's a different collection, you can also index it in a different way to the source collection, matching your user or applications queries.

There are some other differences between the new $merge and the older $out. $merge has much more flexibility over where it can read and write from. For example, it can read from or write to sharded collections ($out can only read from sharded collections) allowing your materialized view to span multiple shards for scaled out collections.

It can also write results to a different database, allowing you to not only perform an updatable aggregation but also migrate the results to a different database on a different node. This enables aggregated data and updates to be moved to an analytics node, reducing the production workload by isolating it from report and chart generation.

So there we have it: $merge, aka On-Demand Materialized Views and probably one of the most powerful new additions to MongoDB 4.2.