GIANT Stories at MongoDB

Data Enrichment with MongoDB Stitch

Maxime Beugnet

mongodb

Objectives

Here is what we are going to achieve in this tutorial:

  • Firstly, we are going to write a document to MongoDB using MongoDB Stitch.
  • The result in our MongoDB collection will look like this:

    
    {
    "_id": ObjectId("5bb27712dced5f37bebf388c"),
    "Title":"Guardians of the Galaxy"
    }
    
    
  • Secondly, a trigger will catch this new insertion and start a function.
  • Lastly, this function will call the OMDB external API with the given movie title, fetch data about that movie, and finally enrich our MongoDB document with the data we gathered from this API.
  • This is the final result we expect in our MongoDB collection:

    
    {  
       "_id": ObjectId("5bb27712dced5f37bebf388c"),
       "Title":"Guardians of the Galaxy",
       "Year":"2014",
       "Rated":"PG-13",
       "Released":"01 Aug 2014",
       "Runtime":"121 min",
       "Genre":"Action, Adventure, Comedy",
       "Director":"James Gunn",
       "Writer":"James Gunn, Nicole Perlman, Dan Abnett (based on the Marvel comics by), Andy Lanning (based on the Marvel comics by), Bill Mantlo (character created by: Rocket Raccoon), Keith Giffen (character created by: Rocket Raccoon), Jim Starlin (characters created by: Drax the Destroyer,  Gamora & Thanos), Steve Englehart (character created by: Star-Lord), Steve Gan (character created by: Star-Lord), Steve Gerber (character created by: Howard the Duck), Val Mayerik (character created by: Howard the Duck)",
       "Actors":"Chris Pratt, Zoe Saldana, Dave Bautista, Vin Diesel",
       "Plot":"A group of intergalactic criminals are forced to work together to stop a fanatical warrior from taking control of the universe.",
       "Language":"English",
       "Country":"USA",
       "Awards":"Nominated for 2 Oscars. Another 52 wins & 99 nominations.",
       "Poster":"https://m.media-amazon.com/images/M/MV5BMTAwMjU5OTgxNjZeQTJeQWpwZ15BbWU4MDUxNDYxODEx._V1_SX300.jpg",
       "Ratings":[  
          {  
             "Source":"Internet Movie Database",
             "Value":"8.1/10"
          },
          {  
             "Source":"Rotten Tomatoes",
             "Value":"91%"
          },
          {  
             "Source":"Metacritic",
             "Value":"76/100"
          }
       ],
       "Metascore":"76",
       "imdbRating":"8.1",
       "imdbVotes":"871,949",
       "imdbID":"tt2015381",
       "Type":"movie",
       "DVD":"09 Dec 2014",
       "BoxOffice":"$270,592,504",
       "Production":"Walt Disney Pictures",
       "Website":"http://marvel.com/guardians",
       "Response":"True"
    }
    
    

    Prerequisites

    So first of all, if you want to try this at home, it is very easy. The only requirement here is to create a free MongoDB Atlas cluster. This video will show you the steps.

    MongoDB Stitch is our serverless platform, built by MongoDB on top of MongoDB Atlas. Once our MongoDB Atlas cluster is ready to use, link a MongoDB Stitch application to it:

    • Click on the left panel on “Stitch Apps”,
    • Then click on “Create New Application”,
    • Pick the name you want for your application,
    • Link it to your free MongoDB Atlas cluster.

    Actions

    To be able to send a document to MongoDB, we are going to use an HTTP POST service.

    • On the left panel, click on “Services”,
    • Then click on “Add a Service”,
    • Choose a service name “IMDB”,
    • Note: “IMDB” will be reuse later in the function code. If you choose another name, please make sure to update the code accordingly.
    • Click on “Add Service”,
    • Click on “Add Incoming Webhook”,
    • And copy the screenshot below.

    When this is done, hit the “Save” button and you will be in the “Function Editor” screen.

    • Enter the following code:
    • 
      exports = function(payload, response) {
        const mongodb = context.services.get("mongodb-atlas");
        const movies = mongodb.db("stitch").collection("movies");
        var body = EJSON.parse(payload.body.text());
        movies.insertOne(body)
        .then(result => {
          response.setStatusCode(201);
        });
      };
      
      
    • Click the “Save” button again.

    Now that our Service is ready, we can test it!

    • Go to the “Settings” and you will find your Webhook URL.
    • You can now send an HTTP POST request like this to MongoDB Stitch:
    • 
      curl -H "Content-Type: application/json" -d '{"Title":"Guardians of the Galaxy"}' https://webhooks.mongodb-stitch.com/api/client/v2.0/app/stitchtapp-abcde/service/IMDB/incoming_webhook/post_movie_title?secret=test
      
      
      Note: I used a curl command but feel free to use Postman or whatever you are used to.
    • We can check it worked by having a look at the content of the “stitch.movies” collection in our MongoDB Atlas Cluster:

    Now that we can insert a new document into MongoDB Atlas using Stitch, we are going to create the trigger.

    • On the left panel click on “Triggers”,
    • Then click on “Add a Database Trigger”,
    • And do as follow.
    • This is the code you will need to create the new function:
    • 
      exports = function(changeEvent) {
        var docId = changeEvent.documentKey._id;
        var title = encodeURIComponent(changeEvent.fullDocument.Title.trim());
      
        var movies = context.services.get("mongodb-atlas").db("stitch").collection("movies");
        var imdb_url = "http://www.omdbapi.com/?apikey=a12b1234&t=" + title;
      
        const http = context.services.get("IMDB"); // change the name of the service here if you used a different name.
          return http
            .get({ url: imdb_url })
            .then(resp => {
              var doc = EJSON.parse(resp.body.text());
              movies.updateOne({"_id":docId}, doc);
              });
      };
      
      
    • As you can see in the middle of the code, I am using the OMDB API and I replaced the API key with a fake one.
    • You can create your own free API Key here with just a valid email address: http://www.omdbapi.com/apikey.aspx - limited to 1000 calls per day.
    • Once this is done, you can just replace my fake API key (highlighted in yellow) with your own key.

    Now that this is ready, we just have to test it by repeating the same CURL command we used before. Feel free to use another movie title if you have a better one in mind ;-).

    I removed the previous test we made before adding the trigger and now this is what I have in my collection:

    Let’s review for a second what we have done:

    • We inserted a new document using an HTTP POST service hosted by MongoDB Stitch containing only an “_id” (automatically populated by MongoDB) and a “Title”.
    • The insertion of this document is detected by the trigger that calls an external Web API using this “Title” as the parameter.
    • We are then parsing the body of the result we get from this API and updating the document we inserted a few milliseconds ago.

    Conclusion

    With just a few lines of code, you can enrich the data where it lives with MongoDB Stitch. This is a great way to leverage your micro services architecture and finally switch to an event-driven architecture.

    Next Steps

    Thanks for taking the time to read my post. I hope you found it useful and interesting.

    If MongoDB Stitch is something you are considering in production, you can discover here how the billing works.

    If you want to query your data sitting in MongoDB Atlas using MongoDB Stitch, I recommend this article from Michael Lynn.

Capture IOT Data With MongoDB Stitch in 5 Minutes

Maxime Beugnet

mongodb, IoT

Capturing IOT data is a complex task for 2 main reasons:

  • We have to deal with a huge amount of data so we need a rock solid architecture.
  • While keeping a bulletproof security level.

First, let’s have a look at a standard IOT capture architecture:

On the left, we have our sensors. Let’s assume they can push data every second over TCP using a POST and let’s suppose we have a million of them. We need an architecture capable to handle a million queries per seconds and able to resist any kind of network or hardware failure. TCP queries need to be distributed evenly to the application servers using load balancers and finally, the application servers are able to push the data to our multiple Mongos routers from our MongoDB Sharded Cluster.

As you can see, this architecture is relatively complex to install. We need to:

  • buy and maintain a lot of servers,
  • make security updates on a regular basis of the Operating Systems and applications,
  • have an auto-scaling capability (reduce maintenance cost & enable automatic failover)...

This kind of architecture is expensive and maintenance cost can be quite high as well.

Now let’s solve this same problem with MongoDB Stitch!

Once you have created a MongoDB Atlas cluster, you can attach a MongoDB Stitch application to it and then create an HTTP Service containing the following code:

exports = function(payload, response) {
  const mongodb = context.services.get("mongodb-atlas");
  const sensors = mongodb.db("stitch").collection("sensors");
  var body = EJSON.parse(payload.body.text());
  body.createdAt = new Date();
  sensors.insertOne(body)
  .then(result => {
    response.setStatusCode(201);
  });
};

And that’s it! That’s all we need! Our HTTP POST service can be reached directly by the sensors from the webhook provided by MongoDB Stitch like so:

curl -H "Content-Type: application/json" -d '{"temp":22.4}' https://webhooks.mongodb-stitch.com/api/client/v2.0/app/stitchtapp-abcde/service/sensors/incoming_webhook/post_sensor?secret=test

Because MongoDB Stitch is capable of scaling automatically according to demand, you no longer have to take care of infrastructure or handling failovers.

Next Step

Thanks for taking the time to read my post. I hope you found it useful and interesting.

If you are looking for a very simple way to get started with MongoDB, you can do that in just 5 clicks on our MongoDB Atlas database service in the cloud.

You can also try MongoDB Stitch for free and discover how the billing works.

If you want to query your data sitting in MongoDB Atlas using MongoDB Stitch, I recommend this article from Michael Lynn.

MongoDB On The Road - DevSharp in Gdansk, Poland

Maxime Beugnet

mongodb

Devsharp 2018 was held in Gdansk on September 21st, 2018 in the Stary Maneż cultural center. It’s only 15 min away from the Airport so it’s very easy to go there and of course, all the conference were in English.

This free conference was such a victim of its success that they had to increase the number of places. Initially planned for 250 persons, about 400 passionate developers answered the call.

The conference was sponsored by IHS Markit, automotiveMastermind, Carfax and of Course, MongoDB. Seven talks were planned during the day from Microsoft, 8x8, and JetBrains for example.

I also happen to have a slot to speak about MongoDB Atlas & MongoDB Stitch and I explained how you could benefit from our platforms to accelerate and simplify your interactions with your data.

I shared my presentation here so feel free to have a look but I have made a lot of live demos leveraging MongoDB Compass, MongoDB Charts, MongoDB Atlas and MongoDB Stitch so make sure to come and see me on stage next time :-).

I would definitely recommend this conference, especially if you are a C# developer so please feel free to join us next year.

I received a lot of questions at the end of my presentation about the MongoDB Drivers and the new Multi-Document ACID Transactions we introduced in MongoDB 4.0.

It was also my largest audience I have spoken to so far. I am really proud and I can’t wait to go again next year :-). Special thanks to the team for your warm welcome!

Pseudonymization with MongoDB Views: The solution for GDPR and Game of Thrones spoilers

GDPR is now in effect in Europe and you are still wondering how you are going to handle this? No worries, MongoDB Views are here to help but first a little definition.

Definition

“Pseudonymization is a data management procedure by which personally identifiable information fields within a data record are replaced by one or more artificial identifiers, or pseudonyms. A single pseudonym for each replaced field or collection of replaced fields makes the data record less identifiable while remaining suitable for data analysis and data processing.”

MongoDB Views

A view in MongoDB is a read-only projection of an existing collection or another view. The purpose of a view is to provide an alternative way to present documents. As we use the Aggregation Pipeline to generate a view, a projection can be done to pseudonymize each field. Consequently, it is possible to create several views, based on use cases and keep a golden copy of the data on the Master Collection.

View workflow creation
View usage

Considerations

Example

I want to show you a practical approach to pseudonymization using views in MongoDB. We will be using a Game of Thrones characters collection, with their status (dead/alive). Here is an example of one document:

{
    "_id" : ObjectId("5ae13de5f54a365254c62ad1"),
    "firstName" : "Daenerys",
    "lastName" : "Targaryen",
    "nickName" : "Khaleesi",
    "house" : "Targaryen",
    "actor" : "Emilia Clarke",
    "gender" : "female",
    "status" : "alive"
}

You can download the dataset here.

And you can import it directly from command line:

mongoimport --drop -d gameofthrones -c characters 
gameofthrones.characters.json

Explore the dataset and notice that the “status” is not always specified. Sometimes this key/value is missing and we need to take that into account in our view.

Because the “status” is a sensitive data full of spoilers, we want to hide this in a spoiler-free view.

Create the view

In this example, we used the following aggregation pipeline to generate the view:

> use gameofthrones
> db.characters.aggregate([
{ "$project" : 
    { 
        _id : 0,
        "firstName" : 1, 
        "lastName" : 1, 
        "house" : 1,
        "status" : 
        {
            "$cond": [{ "$eq": [ "$status", undefined ] }, $status = "Not Specified", $status = "*****" ],
        }    
    } 
} 
])

This aggregation pipeline will project the collection and replace the content of the field “status” with “*****” if this field exists. If not, the “status” will mention “Not Specified”.

Let’s create a view called “charactersNoSpoil”, based on the “characters” collection. We can leverage the aggregation pipeline created in the previous step:

> use gameofthrones
> db.createView(
  "charactersNoSpoil", 
  "characters", 
  [{ 
     "$project" : 
       { 
         _id : 0,
         "firstName" : 1, 
         "lastName" : 1, 
         "house" : 1, 
         "status" : 
           {"$cond": [{ "$eq": [ "$status", undefined ] }, $status = "Not Specified", $status = "*****" ]}
        }
  }] 
)

With the view created you can check it by performing a “show collections”, a “db.getCollectionInfos()” or a “db.charactersNoSpoil.find()” command.

> use gameofthrones
> db.charactersNoSpoil.find({},{firstName:1, status:1})
{ "firstName" : "Daenerys", "status" : "*****" }
{ "firstName" : "Joffrey", "status" : "*****" }
{ "firstName" : "Robert", "status" : "*****" }
{ "firstName" : "Tyron", "status" : "Not Specified" }
{ "firstName" : "Jaime", "status" : "*****" }
{ "firstName" : "Jon", "status" : "*****" }
{ "firstName" : "Cersei", "status" : "*****" }
{ "firstName" : "Sansa", "status" : "*****" }
{ "firstName" : "Tywin", "status" : "Not Specified" }
{ "firstName" : "Arya", "status" : "Not Specified" }
{ "firstName" : "Eddard", "status" : "Not Specified" }

If you are using MongoDB Compass, you should see the following result.

Working with Views

As mentioned in the introduction, views are read-only and use the index from the collection source.

Insert a new document

To add a new document on the view, you have to create the document on the original collection. The new document will be automatically accessible through the view.

> db.characters.insertOne({
  "firstName": "Hodor", 
  "actor": "Kristian Nairn", 
  "gender": "male", 
  "status": "dead"
})

Then check that this document appears within the “charactersNoSpoil” view.

> db.charactersNoSpoil.find({firstName: "Hodor"}) 
{ "firstName" : "Hodor", "status" : "*****" }

Create an Index

To create an index that could be used by the view, you have to create an index on the collection.

> db.characters.createIndex({"house":1})
> db.charactersNoSpoil.find({house: "Stark"}).explain()

extract of the winning plan:
"winningPlan" : {
    "stage" : "FETCH",
    "inputStage" : {
        "stage" : "IXSCAN",
        "keyPattern" : {"house" : 1},
        "indexName" : "house_1",
        "isMultiKey" : false,
        "multiKeyPaths" : {"house" : [ ]},
        "isUnique" : false,
        "isSparse" : false,
        "isPartial" : false,
        "indexVersion" : 2,
        "direction" : "forward",
        "indexBounds" : {"house" : ["[\"Stark\", \"Stark\"]"]}
    }
}

And as you can see here the explain plan shows we are using the index {“house”:1} for the find in our view.

Setup the Access control

Now that the view is created, it is important to segregate who can read the view and who can write to the master collection.

To enforce this example, you will create users and roles to secure the data.

  • Alice should only be able to write in the master collection,
  • Bob should only be able to read the view.

By doing this, you will prevent unauthorized read to the “characters” collection, thus no spoilers :-).

Before starting

If you have not done that already, you need to enable Role-Based Access Control on your MongoDB server by adding “--auth” in the command line or by adding the following lines into your “/etc/mongod.conf” file.

security:
 authorization: enabled

Then you need to connect, create an admin user and log in as this new admin user.

> use admin
> db.createUser( { user: "admin", pwd: "safepassword", roles:["root"] })
> db.auth("admin","safepassword")

If you want to know more about our security recommendations for running MongoDB in production, have a look at our security checklist.

Create the roles

Now that we have a super user, we can create the two roles we need for Alice and Bob.

The first role “writeOnly” will be for Alice:

> use admin
> db.createRole({
    role  : "writeOnly",
    privileges : [
      {
        resource : { db : "gameofthrones", collection : "characters" },
        actions  : [ "insert" ]
      }
    ],
      roles : []
    })

And the second one for Bob:

> use admin
> db.createRole({
    role  : "noSpoilers",
    privileges : [
      {
        resource : { db : "gameofthrones", collection : "charactersNoSpoil" },
        actions  : [ "find" ]
      }
    ],
      roles : []
    })

Now we can create our two users:

> use admin
> db.createUser({ user : "Alice", pwd : "a", roles : ["writeOnly"] })
> db.createUser({ user : "Bob", pwd : "b", roles : ["noSpoilers"] })

Verifications

Alice has only been granted the right to insert into the collection “characters”.

$ mongo -u Alice -p a --authenticationDatabase admin gameofthrones
> db.characters.insertOne({
  "firstName": "Catelyn", 
  "lastName": "Stark", 
  "actor": "Michelle Fairley", 
  "gender": "female", 
  "status": "dead"
})
==> Succeed
> db.characters.findOne()
==> Returns an error: unauthorized.
> db.charactersNoSpoil.findOne()
==> Returns an error: unauthorized.

Bob has only been granted the right to read the view so he cannot be spoiled.

$ mongo -u Bob -p b --authenticationDatabase admin gameofthrones
> db.charactersNoSpoil.findOne()
{
    "firstName" : "Daenerys",
    "lastName" : "Targaryen",
    "house" : "Targaryen",
    "status" : "*****"
}
> db.characters.findOne()
==> Returns an error: unauthorized.
> db.characters.insertOne({
  "firstName": "Jorah", 
  "lastName": "Mormont", 
  "actor": "Iain Glen", 
  "gender": "male", 
  "status": "alive"
})
==> Returns an error: unauthorized.

Audit the View

Now that the view is correctly setup, it’s important to audit the access and actions on the view.

The auditing capability is only available in MongoDB Enterprise Advanced which is free for testing environment and downloadable at https://www.mongodb.com/download-center#enterprise

By default, the audit feature is not activated in MongoDB. It has to be specified at MongoDB start-up by adding options on the command line or by modifying the configuration file.

Here is the command line I am using to start my MongoDB server now:

$ mongod --dbpath /tmp/data \
--auth \
--setParameter auditAuthorizationSuccess=true \
--auditDestination file \
--auditFormat JSON \
--auditPath /tmp/audit.json \
--auditFilter '{ atype: "authCheck", "param.ns": "gameofthrones.charactersNoSpoil", "param.command": { $in: [ "find", "insert", "delete", "update", "findandmodify" ] } }' 

Or the equivalent with a MongoDB configuration file:

storage:
   dbPath: /tmp/data

security:
   authorization: enabled

auditLog:
   destination: file
   format: JSON
   path: /tmp/audit.json
   filter: '{ atype: "authCheck", "param.ns": "gameofthrones.charactersNoSpoil", "param.command": { $in: [ "find", "insert", "delete", "update", "findandmodify" ] } }'

setParameter:
   auditAuthorizationSuccess: true

Thanks to these lines, the Audit system will push information on a JSON file. This could also be sent to syslog for processing purposes.

Now perform a findOne() with Bob on the view and get the following results in /tmp/audit.json.

{  
   "atype":"authCheck",
   "ts":{  
      "$date":"2018-07-18T19:46:28.252+0200"
   },
   "local":{  
      "ip":"127.0.0.1",
      "port":27017
   },
   "remote":{  
      "ip":"127.0.0.1",
      "port":41322
   },
   "users":[  
      {  
         "user":"Bob",
         "db":"gameofthrones"
      }
   ],
   "roles":[  
      {  
         "role":"noSpoilers",
         "db":"gameofthrones"
      }
   ],
   "param":{  
      "command":"find",
      "ns":"gameofthrones.charactersNoSpoil",
      "args":{  
         "find":"charactersNoSpoil",
         "filter":{  

         },
         "limit":1,
         "singleBatch":true,
         "$db":"gameofthrones"
      }
   },
   "result":0
}

Here you can see that the user Bob performed a successful find (“result” : 0) on the charactersNoSpoil view.

When you perform the same with Alice, the “result” code will be 13 – “Unauthorized to perform the operation”.

{  
   "atype":"authCheck",
   "ts":{  
      "$date":"2018-07-18T20:00:12.117+0200"
   },
   "local":{  
      "ip":"127.0.0.1",
      "port":1234
   },
   "remote":{  
      "ip":"127.0.0.1",
      "port":41390
   },
   "users":[  
      {  
         "user":"Alice",
         "db":"gameofthrones"
      }
   ],
   "roles":[  
      {  
         "role":"writeOnly",
         "db":"gameofthrones"
      }
   ],
   "param":{  
      "command":"find",
      "ns":"gameofthrones.charactersNoSpoil",
      "args":{  
         "find":"charactersNoSpoil",
         "filter":{  

         },
         "limit":1,
         "singleBatch":true,
         "$db":"gameofthrones"
      }
   },
   "result":13
}

You can read the documentation about the audit message here.

Next Steps

Thanks for taking the time to read my post. I hope you found it useful and interesting.

If you are looking for a very simple way to get started with MongoDB, you can do that in just 5 clicks on our MongoDB Atlas database service in the cloud. And if you want to learn more about how MongoDB’s security features better help you comply with new regulations, take a look at our guide GDPR: Impact to Your Data Management Landscape.

Also, we recently released MongoDB 4.0 with a lot of great features like multi-document ACID transactions. If you want to learn more, take our free course on MongoDB University M040: New Features and Tools in MongoDB 4.0 and read our guide to what’s new in MongoDB 4.0 where you can learn more about native type conversions, new visualization and analytics tools, and Kubernetes integration.

Sources

Pseudonymization Definition: https://en.wikipedia.org/wiki/Pseudonymization

Data Set for this example: https://s3-eu-west-1.amazonaws.com/developer-advocacy-public/blog/gameofthrones.characters.json

Java and MongoDB 4.0 Support for Multi-Document ACID Transactions

Introduction

MongoDB 4.0 adds support for multi-document ACID transactions.

But wait... Does that mean MongoDB did not support transactions until now? No, actually MongoDB has always supported transactions in the form of single document transactions. MongoDB 4.0 extends these transactional guarantees across multiple documents, multiple statements, multiple collections, and multiple databases. What good would a database be without any form of transactional data integrity guarantee?

Before we dive into this blog post, you can find all the code and try multi-document ACID transactions here.

Quick start

Step 1: Start MongoDB

Start a single node MongoDB ReplicaSet in version 4.0.0 minimum on localhost, port 27017.

If you use Docker:

  • You can use start-mongo.sh.
  • When you are done, you can use stop-mongo.sh.
  • If you want to connect to MongoDB with the Mongo Shell, you can use connect-mongo.sh.

If you prefer to start mongod manually:

  • mkdir /tmp/data && mongod --dbpath /tmp/data --replSet rs
  • mongo --eval 'rs.initiate()'

Step 2: Start Java

This demo contains two main programs: ChangeStreams.java and Transactions.java.

  • Change Steams allow you to be notified of any data changes within a MongoDB collection or database.
  • The Transaction process is the demo itself.

You need two shells to run them.

If you use Docker:

First shell:

./compile-docker.sh
./change-streams-docker.sh

Second shell:

./transactions-docker.sh

If you do not use Docker, you will need to install Maven 3.5.X and a JDK 10 (or JDK 8 minimum but you will need to update the Java versions in the pom.xml):

First shell:

./compile.sh
./change-streams.sh

Second shell:

./transactions.sh

Let’s compare our existing single document transactions with MongoDB 4.0’s ACID compliant multi-document transactions and see how we can leverage this new feature with Java.

Prior to MongoDB 4.0

Even in MongoDB 3.6 and earlier, every write operation is represented as a transaction scoped to the level of an individual document in the storage layer. Because the document model brings together related data that would otherwise be modeled across separate parent-child tables in a tabular schema, MongoDB’s atomic single-document operations provide transaction semantics that meet the data integrity needs of the majority of applications.

Every typical write operation modifying multiple documents actually happens in several independent transactions: one for each document.

Let’s take an example with a very simple stock management application.

First of all, I need a MongoDB Replica Set so please follow the instructions given above to start MongoDB.

Now let’s insert the following documents into a product collection:

MongoDB Enterprise rs:PRIMARY> db.product.insertMany([
    { "_id" : "beer", "price" : NumberDecimal("3.75"), "stock" : NumberInt(5) }, 
    { "_id" : "wine", "price" : NumberDecimal("7.5"), "stock" : NumberInt(3) }
])

Let’s imagine there is a sale on and we want to offer our customers a 20% discount on all our products.

But before applying this discount, we want to monitor when these operations are happening in MongoDB with Change Streams.

Execute the following in Mongo Shell:

cursor = db.product.watch([{$match: {operationType: "update"}}]);
while (!cursor.isExhausted()) {
  if (cursor.hasNext()) {
    print(tojson(cursor.next()));
  }
}

Keep this shell on the side, open another Mongo Shell and apply the discount:

PRIMARY> db.product.updateMany({}, {$mul: {price:0.8}})
{ "acknowledged" : true, "matchedCount" : 2, "modifiedCount" : 2 }
PRIMARY> db.product.find().pretty()
{
    "_id" : "beer",
    "price" : NumberDecimal("3.00000000000000000"),
    "stock" : 5
}
{
    "_id" : "wine",
    "price" : NumberDecimal("6.0000000000000000"),
    "stock" : 3
}

As you can see, both documents were updated with a single command line but not in a single transaction. Here is what we can see in the Change Stream shell:

{
    "_id" : {
        "_data" : "825B4637290000000129295A1004374DC58C611E4C8DA4E5EDE9CF309AC5463C5F6964003C62656572000004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1531328297, 1),
    "ns" : {
        "db" : "test",
        "coll" : "product"
    },
    "documentKey" : {
        "_id" : "beer"
    },
    "updateDescription" : {
        "updatedFields" : {
            "price" : NumberDecimal("3.00000000000000000")
        },
        "removedFields" : [ ]
    }
}
{
    "_id" : {
        "_data" : "825B4637290000000229295A1004374DC58C611E4C8DA4E5EDE9CF309AC5463C5F6964003C77696E65000004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1531328297, 2),
    "ns" : {
        "db" : "test",
        "coll" : "product"
    },
    "documentKey" : {
        "_id" : "wine"
    },
    "updateDescription" : {
        "updatedFields" : {
            "price" : NumberDecimal("6.0000000000000000")
        },
        "removedFields" : [ ]
    }
}

As you can see the cluster times (see the clusterTime key) of the two operations are different: the operations occurred during the same second but the counter of the timestamp has been incremented by one.

Thus here each document is updated one at a time and even if this happens really fast, someone else could read the documents while the update is running and see only one of the two products with the discount.

Most of the time, it is something you can tolerate in your MongoDB database because, as much as possible, we try to embed tightly linked, or related data in the same document. As a result, two updates on the same document happen within a single transaction :

PRIMARY> db.product.update({_id: "wine"},{$inc: {stock:1}, $set: {description : "It’s the best wine on Earth"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
PRIMARY> db.product.findOne({_id: "wine"})
{
    "_id" : "wine",
    "price" : NumberDecimal("6.0000000000000000"),
    "stock" : 4,
    "description" : "It’s the best wine on Earth"
}

However, sometimes, you cannot model all of your related data in a single document, and there are a lot of valid reasons for choosing not to embed documents.

MongoDB 4.0 with multi-document ACID transactions

Multi-document ACID transactions in MongoDB are very similar to what you probably already know from traditional relational databases.

MongoDB’s transactions are a conversational set of related operations that must atomically commit or fully rollback with all-or-nothing execution.

Transactions are used to make sure operations are atomic even across multiple collections or databases. Thus, with snapshot isolation reads, another user can only see all the operations or none of them.

Let’s now add a shopping cart to our example.

For this example, 2 collections are required because we are dealing with 2 different business entities: the stock management and the shopping cart each client can create during shopping. The lifecycle of each document in these collections is different.

A document in the product collection represents an item I’m selling. This contains the current price of the product and the current stock. I created a POJO to represent it : Product.java.

{ "_id" : "beer", "price" : NumberDecimal("3"), "stock" : NumberInt(5) }

A shopping cart is created when a client adds its first item in the cart and is removed when the client proceeds to checkout or leaves the website. I created a POJO to represent it : Cart.java.

{
    "_id" : "Alice",
    "items" : [
        {
            "price" : NumberDecimal("3"),
            "productId" : "beer",
            "quantity" : NumberInt(2)
        }
    ]
}

The challenge here resides in the fact that I cannot sell more than I possess: if I have 5 beers to sell, I cannot have more than 5 beers distributed across the different client carts.

To ensure that, I have to make sure that the operation creating or updating the client cart is atomic with the stock update. That’s where the multi-document transaction comes into play. The transaction must fail in the case someone tries to buy something I do not have in my stock. I will add a constraint on the product stock:

db.createCollection("product", {
   validator: {
      $jsonSchema: {
         bsonType: "object",
         required: [ "_id", "price", "stock" ],
         properties: {
            _id: {
               bsonType: "string",
               description: "must be a string and is required"
            },
            price: {
               bsonType: "decimal",
               minimum: 0,
               description: "must be a positive decimal and is required"
            },
            stock: {
               bsonType: "int",
               minimum: 0,
               description: "must be a positive integer and is required"
            }
         }
      }
   }
})

Node that this is already included in the Java code.

To monitor our example, we are going to use MongoDB Change Streams that were introduced in MongoDB 3.6.

In each of the threads of this process called ChangeStreams.java, I am going to monitor one of the 2 collections and print each operation with its associated cluster time.

// package and imports

public class ChangeStreams {

    private static final Bson filterUpdate = Filters.eq("operationType", "update");
    private static final Bson filterInsertUpdate = Filters.in("operationType", "insert", "update");
    private static final String jsonSchema = "{ $jsonSchema: { bsonType: \"object\", required: [ \"_id\", \"price\", \"stock\" ], properties: { _id: { bsonType: \"string\", description: \"must be a string and is required\" }, price: { bsonType: \"decimal\", minimum: 0, description: \"must be a positive decimal and is required\" }, stock: { bsonType: \"int\", minimum: 0, description: \"must be a positive integer and is required\" } } } } ";

    public static void main(String[] args) {
        MongoDatabase db = initMongoDB(args[0]);
        MongoCollection<Cart> cartCollection = db.getCollection("cart", Cart.class);
        MongoCollection<Product> productCollection = db.getCollection("product", Product.class);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> watchChangeStream(productCollection, filterUpdate));
        executor.submit(() -> watchChangeStream(cartCollection, filterInsertUpdate));
        ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
        scheduled.scheduleWithFixedDelay(System.out::println, 0, 1, TimeUnit.SECONDS);
    }

    private static void watchChangeStream(MongoCollection<?> collection, Bson filter) {
        System.out.println("Watching " + collection.getNamespace());
        List<Bson> pipeline = Collections.singletonList(Aggregates.match(filter));
        collection.watch(pipeline)
                  .fullDocument(FullDocument.UPDATE_LOOKUP)
                  .forEach((Consumer<ChangeStreamDocument<?>>) doc -> System.out.println(
                          doc.getClusterTime() + " => " + doc.getFullDocument()));
    }

    private static MongoDatabase initMongoDB(String mongodbURI) {
        getLogger("org.mongodb.driver").setLevel(Level.SEVERE);
        CodecRegistry providers = fromProviders(PojoCodecProvider.builder().register("com.mongodb.models").build());
        CodecRegistry codecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(), providers);
        MongoClientOptions.Builder options = new MongoClientOptions.Builder().codecRegistry(codecRegistry);
        MongoClientURI uri = new MongoClientURI(mongodbURI, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase db = client.getDatabase("test");
        db.drop();
        db.createCollection("cart");
        db.createCollection("product", productJsonSchemaValidator());
        return db;
    }

    private static CreateCollectionOptions productJsonSchemaValidator() {
        return new CreateCollectionOptions().validationOptions(
                new ValidationOptions().validationAction(ValidationAction.ERROR).validator(BsonDocument.parse(jsonSchema)));
    }
}

In this example we have 5 beers to sell. Alice wants to buy 2 beers but we are not going to use the new MongoDB 4.0 multi-document transactions for this. We will observe in the change streams two operations : one creating the cart and one updating the stock at 2 different cluster times.

Then Alice adds 2 more beers in her cart and we are going to use a transaction this time. The result in the change stream will be 2 operations happening at the same cluster time.

Finally, she will try to order 2 extra beers but the jsonSchema validator will fail the product update and result in a rollback. We will not see anything in the change stream. Here is the Transaction.java source code:

// package and import

public class Transactions {

    private static MongoClient client;
    private static MongoCollection<Cart> cartCollection;
    private static MongoCollection<Product> productCollection;

    private final BigDecimal BEER_PRICE = BigDecimal.valueOf(3);
    private final String BEER_ID = "beer";

    private final Bson stockUpdate = inc("stock", -2);
    private final Bson filterId = eq("_id", BEER_ID);
    private final Bson filterAlice = eq("_id", "Alice");
    private final Bson matchBeer = elemMatch("items", eq("productId", "beer"));
    private final Bson incrementBeers = inc("items.$.quantity", 2);

    public static void main(String[] args) {
        initMongoDB(args[0]);
        new Transactions().demo();
    }

    private static void initMongoDB(String mongodbURI) {
        getLogger("org.mongodb.driver").setLevel(Level.SEVERE);
        CodecRegistry codecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(), fromProviders(
                PojoCodecProvider.builder().register("com.mongodb.models").build()));
        MongoClientOptions.Builder options = new MongoClientOptions.Builder().codecRegistry(codecRegistry);
        MongoClientURI uri = new MongoClientURI(mongodbURI, options);
        client = new MongoClient(uri);
        MongoDatabase db = client.getDatabase("test");
        cartCollection = db.getCollection("cart", Cart.class);
        productCollection = db.getCollection("product", Product.class);
    }

    private void demo() {
        clearCollections();
        insertProductBeer();
        printDatabaseState();
        System.out.println("#########  NO  TRANSACTION #########");
        System.out.println("Alice wants 2 beers.");
        System.out.println("We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.");
        System.out.println("The 2 actions are correlated but can not be executed on the same cluster time.");
        System.out.println("Any error blocking one operation could result in stock error or beer sale we don't own.");
        System.out.println("---------------------------------------------------------------------------");
        aliceWantsTwoBeers();
        sleep();
        removingBeersFromStock();
        System.out.println("####################################\n");
        printDatabaseState();
        sleep();
        System.out.println("\n######### WITH TRANSACTION #########");
        System.out.println("Alice wants 2 extra beers.");
        System.out.println("Now we can update the 2 collections simultaneously.");
        System.out.println("The 2 operations only happen when the transaction is committed.");
        System.out.println("---------------------------------------------------------------------------");
        aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback();
        sleep();
        System.out.println("\n######### WITH TRANSACTION #########");
        System.out.println("Alice wants 2 extra beers.");
        System.out.println("This time we do not have enough beers in stock so the transaction will rollback.");
        System.out.println("---------------------------------------------------------------------------");
        aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback();
        client.close();
    }

    private void aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback() {
        ClientSession session = client.startSession();
        try {
            session.startTransaction(TransactionOptions.builder().writeConcern(WriteConcern.MAJORITY).build());
            aliceWantsTwoExtraBeers(session);
            sleep();
            removingBeerFromStock(session);
            session.commitTransaction();
        } catch (MongoCommandException e) {
            session.abortTransaction();
            System.out.println("####### ROLLBACK TRANSACTION #######");
        } finally {
            session.close();
            System.out.println("####################################\n");
            printDatabaseState();
        }
    }

    private void removingBeersFromStock() {
        System.out.println("Trying to update beer stock : -2 beers.");
        try {
            productCollection.updateOne(filterId, stockUpdate);
        } catch (MongoCommandException e) {
            System.out.println("#####   MongoCommandException  #####");
            System.out.println("##### STOCK CANNOT BE NEGATIVE #####");
            throw e;
        }
    }

    private void removingBeerFromStock(ClientSession session) {
        System.out.println("Trying to update beer stock : -2 beers.");
        try {
            productCollection.updateOne(session, filterId, stockUpdate);
        } catch (MongoCommandException e) {
            System.out.println("#####   MongoCommandException  #####");
            System.out.println("##### STOCK CANNOT BE NEGATIVE #####");
            throw e;
        }
    }

    private void aliceWantsTwoBeers() {
        System.out.println("Alice adds 2 beers in her cart.");
        cartCollection.insertOne(new Cart("Alice", Collections.singletonList(new Cart.Item(BEER_ID, 2, BEER_PRICE))));
    }

    private void aliceWantsTwoExtraBeers(ClientSession session) {
        System.out.println("Updating Alice cart : adding 2 beers.");
        cartCollection.updateOne(session, and(filterAlice, matchBeer), incrementBeers);
    }

    private void insertProductBeer() {
        productCollection.insertOne(new Product(BEER_ID, 5, BEER_PRICE));
    }

    private void clearCollections() {
        productCollection.deleteMany(new BsonDocument());
        cartCollection.deleteMany(new BsonDocument());
    }

    private void printDatabaseState() {
        System.out.println("Database state:");
        printProducts(productCollection.find().into(new ArrayList<>()));
        printCarts(cartCollection.find().into(new ArrayList<>()));
        System.out.println();
    }

    private void printProducts(List<Product> products) {
        products.forEach(System.out::println);
    }

    private void printCarts(List<Cart> carts) {
        if (carts.isEmpty())
            System.out.println("No carts...");
        else
            carts.forEach(System.out::println);
    }

    private void sleep() {
        System.out.println("Sleeping 3 seconds...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.err.println("Oups...");
            e.printStackTrace();
        }
    }
}

Here is the console of the Change Stream :

$ ./change-streams.sh 

Watching test.cart
Watching test.product

Timestamp{value=6570052721557110786, seconds=1529709604, inc=2} => Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]}



Timestamp{value=6570052734442012673, seconds=1529709607, inc=1} => Product{id='beer', stock=3, price=3}






Timestamp{value=6570052764506783745, seconds=1529709614, inc=1} => Product{id='beer', stock=1, price=3}
Timestamp{value=6570052764506783745, seconds=1529709614, inc=1} => Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

As you can see here, we only get four operations because the two last operations were never committed to the database, and therefore the change stream has nothing to show.

You can also note that the two first cluster times are different because we did not use a transaction for the two first operations, and the two last operations share the same cluster time because we used the new MongoDB 4.0 multi-document transaction system, and thus they are atomic.

Here is the console of the Transaction java process that sum up everything I said earlier.

$ ./transactions.sh 
Database state:
Product{id='beer', stock=5, price=3}
No carts...

#########  NO  TRANSACTION #########
Alice wants 2 beers.
We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.
The 2 actions are correlated but can not be executed on the same cluster time.
Any error blocking one operation could result in stock error or a sale of beer that we can’t fulfill as we have no stock.
---------------------------------------------------------------------------
Alice adds 2 beers in her cart.
Sleeping 3 seconds...
Trying to update beer stock : -2 beers.
####################################

Database state:
Product{id='beer', stock=3, price=3}
Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]}

Sleeping 3 seconds...

######### WITH TRANSACTION #########
Alice wants 2 extra beers.
Now we can update the 2 collections simultaneously.
The 2 operations only happen when the transaction is committed.
---------------------------------------------------------------------------
Updating Alice cart : adding 2 beers.
Sleeping 3 seconds...
Trying to update beer stock : -2 beers.
####################################

Database state:
Product{id='beer', stock=1, price=3}
Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

Sleeping 3 seconds...

######### WITH TRANSACTION #########
Alice wants 2 extra beers.
This time we do not have enough beers in stock so the transaction will rollback.
---------------------------------------------------------------------------
Updating Alice cart : adding 2 beers.
Sleeping 3 seconds...
Trying to update beer stock : -2 beers.
#####   MongoCommandException  #####
##### STOCK CANNOT BE NEGATIVE #####
####### ROLLBACK TRANSACTION #######
####################################

Database state:
Product{id='beer', stock=1, price=3}
Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

Next Steps

Thanks for taking the time to read my post - I hope you found it useful and interesting. As a reminder, all the code is available on this Github repository for you to experiment.

If you are looking for a very simple way to get started with MongoDB, you can do that in just 5 clicks on our MongoDB Atlas database service in the cloud.

Also, multi-document ACID transactions is not the only new feature in MongoDB 4.0, so feel free to take a look at our free course on MongoDB University M040: New Features and Tools in MongoDB 4.0 and our guide to what’s new in MongoDB 4.0 where you can learn more about native type conversions, new visualization and analytics tools, and Kubernetes integration.