GIANT Stories at MongoDB

Pseudonymization with MongoDB Views: The solution for GDPR and Game of Thrones spoilers

GDPR is now in effect in Europe and you are still wondering how you are going to handle this? No worries, MongoDB Views are here to help but first a little definition.

Definition

“Pseudonymization is a data management procedure by which personally identifiable information fields within a data record are replaced by one or more artificial identifiers, or pseudonyms. A single pseudonym for each replaced field or collection of replaced fields makes the data record less identifiable while remaining suitable for data analysis and data processing.”

MongoDB Views

A view in MongoDB is a read-only projection of an existing collection or another view. The purpose of a view is to provide an alternative way to present documents. As we use the Aggregation Pipeline to generate a view, a projection can be done to pseudonymize each field. Consequently, it is possible to create several views, based on use cases and keep a golden copy of the data on the Master Collection.

View workflow creation
View usage

Considerations

Example

I want to show you a practical approach to pseudonymization using views in MongoDB. We will be using a Game of Thrones characters collection, with their status (dead/alive). Here is an example of one document:

{
    "_id" : ObjectId("5ae13de5f54a365254c62ad1"),
    "firstName" : "Daenerys",
    "lastName" : "Targaryen",
    "nickName" : "Khaleesi",
    "house" : "Targaryen",
    "actor" : "Emilia Clarke",
    "gender" : "female",
    "status" : "alive"
}

You can download the dataset here.

And you can import it directly from command line:

mongoimport --drop -d gameofthrones -c characters 
gameofthrones.characters.json

Explore the dataset and notice that the “status” is not always specified. Sometimes this key/value is missing and we need to take that into account in our view.

Because the “status” is a sensitive data full of spoilers, we want to hide this in a spoiler-free view.

Create the view

In this example, we used the following aggregation pipeline to generate the view:

> use gameofthrones
> db.characters.aggregate([
{ "$project" : 
    { 
        _id : 0,
        "firstName" : 1, 
        "lastName" : 1, 
        "house" : 1,
        "status" : 
        {
            "$cond": [{ "$eq": [ "$status", undefined ] }, $status = "Not Specified", $status = "*****" ],
        }    
    } 
} 
])

This aggregation pipeline will project the collection and replace the content of the field “status” with “*****” if this field exists. If not, the “status” will mention “Not Specified”.

Let’s create a view called “charactersNoSpoil”, based on the “characters” collection. We can leverage the aggregation pipeline created in the previous step:

> use gameofthrones
> db.createView(
  "charactersNoSpoil", 
  "characters", 
  [{ 
     "$project" : 
       { 
         _id : 0,
         "firstName" : 1, 
         "lastName" : 1, 
         "house" : 1, 
         "status" : 
           {"$cond": [{ "$eq": [ "$status", undefined ] }, $status = "Not Specified", $status = "*****" ]}
        }
  }] 
)

With the view created you can check it by performing a “show collections”, a “db.getCollectionInfos()” or a “db.charactersNoSpoil.find()” command.

> use gameofthrones
> db.charactersNoSpoil.find({},{firstName:1, status:1})
{ "firstName" : "Daenerys", "status" : "*****" }
{ "firstName" : "Joffrey", "status" : "*****" }
{ "firstName" : "Robert", "status" : "*****" }
{ "firstName" : "Tyron", "status" : "Not Specified" }
{ "firstName" : "Jaime", "status" : "*****" }
{ "firstName" : "Jon", "status" : "*****" }
{ "firstName" : "Cersei", "status" : "*****" }
{ "firstName" : "Sansa", "status" : "*****" }
{ "firstName" : "Tywin", "status" : "Not Specified" }
{ "firstName" : "Arya", "status" : "Not Specified" }
{ "firstName" : "Eddard", "status" : "Not Specified" }

If you are using MongoDB Compass, you should see the following result.

Working with Views

As mentioned in the introduction, views are read-only and use the index from the collection source.

Insert a new document

To add a new document on the view, you have to create the document on the original collection. The new document will be automatically accessible through the view.

> db.characters.insertOne({
  "firstName": "Hodor", 
  "actor": "Kristian Nairn", 
  "gender": "male", 
  "status": "dead"
})

Then check that this document appears within the “charactersNoSpoil” view.

> db.charactersNoSpoil.find({firstName: "Hodor"}) 
{ "firstName" : "Hodor", "status" : "*****" }

Create an Index

To create an index that could be used by the view, you have to create an index on the collection.

> db.characters.createIndex({"house":1})
> db.charactersNoSpoil.find({house: "Stark"}).explain()

extract of the winning plan:
"winningPlan" : {
    "stage" : "FETCH",
    "inputStage" : {
        "stage" : "IXSCAN",
        "keyPattern" : {"house" : 1},
        "indexName" : "house_1",
        "isMultiKey" : false,
        "multiKeyPaths" : {"house" : [ ]},
        "isUnique" : false,
        "isSparse" : false,
        "isPartial" : false,
        "indexVersion" : 2,
        "direction" : "forward",
        "indexBounds" : {"house" : ["[\"Stark\", \"Stark\"]"]}
    }
}

And as you can see here the explain plan shows we are using the index {“house”:1} for the find in our view.

Setup the Access control

Now that the view is created, it is important to segregate who can read the view and who can write to the master collection.

To enforce this example, you will create users and roles to secure the data.

  • Alice should only be able to write in the master collection,
  • Bob should only be able to read the view.

By doing this, you will prevent unauthorized read to the “characters” collection, thus no spoilers :-).

Before starting

If you have not done that already, you need to enable Role-Based Access Control on your MongoDB server by adding “--auth” in the command line or by adding the following lines into your “/etc/mongod.conf” file.

security:
 authorization: enabled

Then you need to connect, create an admin user and log in as this new admin user.

> use admin
> db.createUser( { user: "admin", pwd: "safepassword", roles:["root"] })
> db.auth("admin","safepassword")

If you want to know more about our security recommendations for running MongoDB in production, have a look at our security checklist.

Create the roles

Now that we have a super user, we can create the two roles we need for Alice and Bob.

The first role “writeOnly” will be for Alice:

> use admin
> db.createRole({
    role  : "writeOnly",
    privileges : [
      {
        resource : { db : "gameofthrones", collection : "characters" },
        actions  : [ "insert" ]
      }
    ],
      roles : []
    })

And the second one for Bob:

> use admin
> db.createRole({
    role  : "noSpoilers",
    privileges : [
      {
        resource : { db : "gameofthrones", collection : "charactersNoSpoil" },
        actions  : [ "find" ]
      }
    ],
      roles : []
    })

Now we can create our two users:

> use admin
> db.createUser({ user : "Alice", pwd : "a", roles : ["writeOnly"] })
> db.createUser({ user : "Bob", pwd : "b", roles : ["noSpoilers"] })

Verifications

Alice has only been granted the right to insert into the collection “characters”.

$ mongo -u Alice -p a --authenticationDatabase admin gameofthrones
> db.characters.insertOne({
  "firstName": "Catelyn", 
  "lastName": "Stark", 
  "actor": "Michelle Fairley", 
  "gender": "female", 
  "status": "dead"
})
==> Succeed
> db.characters.findOne()
==> Returns an error: unauthorized.
> db.charactersNoSpoil.findOne()
==> Returns an error: unauthorized.

Bob has only been granted the right to read the view so he cannot be spoiled.

$ mongo -u Bob -p b --authenticationDatabase admin gameofthrones
> db.charactersNoSpoil.findOne()
{
    "firstName" : "Daenerys",
    "lastName" : "Targaryen",
    "house" : "Targaryen",
    "status" : "*****"
}
> db.characters.findOne()
==> Returns an error: unauthorized.
> db.characters.insertOne({
  "firstName": "Jorah", 
  "lastName": "Mormont", 
  "actor": "Iain Glen", 
  "gender": "male", 
  "status": "alive"
})
==> Returns an error: unauthorized.

Audit the View

Now that the view is correctly setup, it’s important to audit the access and actions on the view.

The auditing capability is only available in MongoDB Enterprise Advanced which is free for testing environment and downloadable at https://www.mongodb.com/download-center#enterprise

By default, the audit feature is not activated in MongoDB. It has to be specified at MongoDB start-up by adding options on the command line or by modifying the configuration file.

Here is the command line I am using to start my MongoDB server now:

$ mongod --dbpath /tmp/data \
--auth \
--setParameter auditAuthorizationSuccess=true \
--auditDestination file \
--auditFormat JSON \
--auditPath /tmp/audit.json \
--auditFilter '{ atype: "authCheck", "param.ns": "gameofthrones.charactersNoSpoil", "param.command": { $in: [ "find", "insert", "delete", "update", "findandmodify" ] } }' 

Or the equivalent with a MongoDB configuration file:

storage:
   dbPath: /tmp/data

security:
   authorization: enabled

auditLog:
   destination: file
   format: JSON
   path: /tmp/audit.json
   filter: '{ atype: "authCheck", "param.ns": "gameofthrones.charactersNoSpoil", "param.command": { $in: [ "find", "insert", "delete", "update", "findandmodify" ] } }'

setParameter:
   auditAuthorizationSuccess: true

Thanks to these lines, the Audit system will push information on a JSON file. This could also be sent to syslog for processing purposes.

Now perform a findOne() with Bob on the view and get the following results in /tmp/audit.json.

{  
   "atype":"authCheck",
   "ts":{  
      "$date":"2018-07-18T19:46:28.252+0200"
   },
   "local":{  
      "ip":"127.0.0.1",
      "port":27017
   },
   "remote":{  
      "ip":"127.0.0.1",
      "port":41322
   },
   "users":[  
      {  
         "user":"Bob",
         "db":"gameofthrones"
      }
   ],
   "roles":[  
      {  
         "role":"noSpoilers",
         "db":"gameofthrones"
      }
   ],
   "param":{  
      "command":"find",
      "ns":"gameofthrones.charactersNoSpoil",
      "args":{  
         "find":"charactersNoSpoil",
         "filter":{  

         },
         "limit":1,
         "singleBatch":true,
         "$db":"gameofthrones"
      }
   },
   "result":0
}

Here you can see that the user Bob performed a successful find (“result” : 0) on the charactersNoSpoil view.

When you perform the same with Alice, the “result” code will be 13 – “Unauthorized to perform the operation”.

{  
   "atype":"authCheck",
   "ts":{  
      "$date":"2018-07-18T20:00:12.117+0200"
   },
   "local":{  
      "ip":"127.0.0.1",
      "port":1234
   },
   "remote":{  
      "ip":"127.0.0.1",
      "port":41390
   },
   "users":[  
      {  
         "user":"Alice",
         "db":"gameofthrones"
      }
   ],
   "roles":[  
      {  
         "role":"writeOnly",
         "db":"gameofthrones"
      }
   ],
   "param":{  
      "command":"find",
      "ns":"gameofthrones.charactersNoSpoil",
      "args":{  
         "find":"charactersNoSpoil",
         "filter":{  

         },
         "limit":1,
         "singleBatch":true,
         "$db":"gameofthrones"
      }
   },
   "result":13
}

You can read the documentation about the audit message here.

Next Steps

Thanks for taking the time to read my post. I hope you found it useful and interesting.

If you are looking for a very simple way to get started with MongoDB, you can do that in just 5 clicks on our MongoDB Atlas database service in the cloud. And if you want to learn more about how MongoDB’s security features better help you comply with new regulations, take a look at our guide GDPR: Impact to Your Data Management Landscape.

Also, we recently released MongoDB 4.0 with a lot of great features like multi-document ACID transactions. If you want to learn more, take our free course on MongoDB University M040: New Features and Tools in MongoDB 4.0 and read our guide to what’s new in MongoDB 4.0 where you can learn more about native type conversions, new visualization and analytics tools, and Kubernetes integration.

Sources

Pseudonymization Definition: https://en.wikipedia.org/wiki/Pseudonymization

Data Set for this example: https://s3-eu-west-1.amazonaws.com/developer-advocacy-public/blog/gameofthrones.characters.json

Java and MongoDB 4.0 Support for Multi-Document ACID Transactions

Introduction

MongoDB 4.0 adds support for multi-document ACID transactions.

But wait... Does that mean MongoDB did not support transactions until now? No, actually MongoDB has always supported transactions in the form of single document transactions. MongoDB 4.0 extends these transactional guarantees across multiple documents, multiple statements, multiple collections, and multiple databases. What good would a database be without any form of transactional data integrity guarantee?

Before we dive into this blog post, you can find all the code and try multi-document ACID transactions here.

Quick start

Step 1: Start MongoDB

Start a single node MongoDB ReplicaSet in version 4.0.0 minimum on localhost, port 27017.

If you use Docker:

  • You can use start-mongo.sh.
  • When you are done, you can use stop-mongo.sh.
  • If you want to connect to MongoDB with the Mongo Shell, you can use connect-mongo.sh.

If you prefer to start mongod manually:

  • mkdir /tmp/data && mongod --dbpath /tmp/data --replSet rs
  • mongo --eval 'rs.initiate()'

Step 2: Start Java

This demo contains two main programs: ChangeStreams.java and Transactions.java.

  • Change Steams allow you to be notified of any data changes within a MongoDB collection or database.
  • The Transaction process is the demo itself.

You need two shells to run them.

If you use Docker:

First shell:

./compile-docker.sh
./change-streams-docker.sh

Second shell:

./transactions-docker.sh

If you do not use Docker, you will need to install Maven 3.5.X and a JDK 10 (or JDK 8 minimum but you will need to update the Java versions in the pom.xml):

First shell:

./compile.sh
./change-streams.sh

Second shell:

./transactions.sh

Let’s compare our existing single document transactions with MongoDB 4.0’s ACID compliant multi-document transactions and see how we can leverage this new feature with Java.

Prior to MongoDB 4.0

Even in MongoDB 3.6 and earlier, every write operation is represented as a transaction scoped to the level of an individual document in the storage layer. Because the document model brings together related data that would otherwise be modeled across separate parent-child tables in a tabular schema, MongoDB’s atomic single-document operations provide transaction semantics that meet the data integrity needs of the majority of applications.

Every typical write operation modifying multiple documents actually happens in several independent transactions: one for each document.

Let’s take an example with a very simple stock management application.

First of all, I need a MongoDB Replica Set so please follow the instructions given above to start MongoDB.

Now let’s insert the following documents into a product collection:

MongoDB Enterprise rs:PRIMARY> db.product.insertMany([
    { "_id" : "beer", "price" : NumberDecimal("3.75"), "stock" : NumberInt(5) }, 
    { "_id" : "wine", "price" : NumberDecimal("7.5"), "stock" : NumberInt(3) }
])

Let’s imagine there is a sale on and we want to offer our customers a 20% discount on all our products.

But before applying this discount, we want to monitor when these operations are happening in MongoDB with Change Streams.

Execute the following in Mongo Shell:

cursor = db.product.watch([{$match: {operationType: "update"}}]);
while (!cursor.isExhausted()) {
  if (cursor.hasNext()) {
    print(tojson(cursor.next()));
  }
}

Keep this shell on the side, open another Mongo Shell and apply the discount:

PRIMARY> db.product.updateMany({}, {$mul: {price:0.8}})
{ "acknowledged" : true, "matchedCount" : 2, "modifiedCount" : 2 }
PRIMARY> db.product.find().pretty()
{
    "_id" : "beer",
    "price" : NumberDecimal("3.00000000000000000"),
    "stock" : 5
}
{
    "_id" : "wine",
    "price" : NumberDecimal("6.0000000000000000"),
    "stock" : 3
}

As you can see, both documents were updated with a single command line but not in a single transaction. Here is what we can see in the Change Stream shell:

{
    "_id" : {
        "_data" : "825B4637290000000129295A1004374DC58C611E4C8DA4E5EDE9CF309AC5463C5F6964003C62656572000004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1531328297, 1),
    "ns" : {
        "db" : "test",
        "coll" : "product"
    },
    "documentKey" : {
        "_id" : "beer"
    },
    "updateDescription" : {
        "updatedFields" : {
            "price" : NumberDecimal("3.00000000000000000")
        },
        "removedFields" : [ ]
    }
}
{
    "_id" : {
        "_data" : "825B4637290000000229295A1004374DC58C611E4C8DA4E5EDE9CF309AC5463C5F6964003C77696E65000004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1531328297, 2),
    "ns" : {
        "db" : "test",
        "coll" : "product"
    },
    "documentKey" : {
        "_id" : "wine"
    },
    "updateDescription" : {
        "updatedFields" : {
            "price" : NumberDecimal("6.0000000000000000")
        },
        "removedFields" : [ ]
    }
}

As you can see the cluster times (see the clusterTime key) of the two operations are different: the operations occurred during the same second but the counter of the timestamp has been incremented by one.

Thus here each document is updated one at a time and even if this happens really fast, someone else could read the documents while the update is running and see only one of the two products with the discount.

Most of the time, it is something you can tolerate in your MongoDB database because, as much as possible, we try to embed tightly linked, or related data in the same document. As a result, two updates on the same document happen within a single transaction :

PRIMARY> db.product.update({_id: "wine"},{$inc: {stock:1}, $set: {description : "It’s the best wine on Earth"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
PRIMARY> db.product.findOne({_id: "wine"})
{
    "_id" : "wine",
    "price" : NumberDecimal("6.0000000000000000"),
    "stock" : 4,
    "description" : "It’s the best wine on Earth"
}

However, sometimes, you cannot model all of your related data in a single document, and there are a lot of valid reasons for choosing not to embed documents.

MongoDB 4.0 with multi-document ACID transactions

Multi-document ACID transactions in MongoDB are very similar to what you probably already know from traditional relational databases.

MongoDB’s transactions are a conversational set of related operations that must atomically commit or fully rollback with all-or-nothing execution.

Transactions are used to make sure operations are atomic even across multiple collections or databases. Thus, with snapshot isolation reads, another user can only see all the operations or none of them.

Let’s now add a shopping cart to our example.

For this example, 2 collections are required because we are dealing with 2 different business entities: the stock management and the shopping cart each client can create during shopping. The lifecycle of each document in these collections is different.

A document in the product collection represents an item I’m selling. This contains the current price of the product and the current stock. I created a POJO to represent it : Product.java.

{ "_id" : "beer", "price" : NumberDecimal("3"), "stock" : NumberInt(5) }

A shopping cart is created when a client adds its first item in the cart and is removed when the client proceeds to checkout or leaves the website. I created a POJO to represent it : Cart.java.

{
    "_id" : "Alice",
    "items" : [
        {
            "price" : NumberDecimal("3"),
            "productId" : "beer",
            "quantity" : NumberInt(2)
        }
    ]
}

The challenge here resides in the fact that I cannot sell more than I possess: if I have 5 beers to sell, I cannot have more than 5 beers distributed across the different client carts.

To ensure that, I have to make sure that the operation creating or updating the client cart is atomic with the stock update. That’s where the multi-document transaction comes into play. The transaction must fail in the case someone tries to buy something I do not have in my stock. I will add a constraint on the product stock:

db.createCollection("product", {
   validator: {
      $jsonSchema: {
         bsonType: "object",
         required: [ "_id", "price", "stock" ],
         properties: {
            _id: {
               bsonType: "string",
               description: "must be a string and is required"
            },
            price: {
               bsonType: "decimal",
               minimum: 0,
               description: "must be a positive decimal and is required"
            },
            stock: {
               bsonType: "int",
               minimum: 0,
               description: "must be a positive integer and is required"
            }
         }
      }
   }
})

Node that this is already included in the Java code.

To monitor our example, we are going to use MongoDB Change Streams that were introduced in MongoDB 3.6.

In each of the threads of this process called ChangeStreams.java, I am going to monitor one of the 2 collections and print each operation with its associated cluster time.

// package and imports

public class ChangeStreams {

    private static final Bson filterUpdate = Filters.eq("operationType", "update");
    private static final Bson filterInsertUpdate = Filters.in("operationType", "insert", "update");
    private static final String jsonSchema = "{ $jsonSchema: { bsonType: \"object\", required: [ \"_id\", \"price\", \"stock\" ], properties: { _id: { bsonType: \"string\", description: \"must be a string and is required\" }, price: { bsonType: \"decimal\", minimum: 0, description: \"must be a positive decimal and is required\" }, stock: { bsonType: \"int\", minimum: 0, description: \"must be a positive integer and is required\" } } } } ";

    public static void main(String[] args) {
        MongoDatabase db = initMongoDB(args[0]);
        MongoCollection<Cart> cartCollection = db.getCollection("cart", Cart.class);
        MongoCollection<Product> productCollection = db.getCollection("product", Product.class);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> watchChangeStream(productCollection, filterUpdate));
        executor.submit(() -> watchChangeStream(cartCollection, filterInsertUpdate));
        ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
        scheduled.scheduleWithFixedDelay(System.out::println, 0, 1, TimeUnit.SECONDS);
    }

    private static void watchChangeStream(MongoCollection<?> collection, Bson filter) {
        System.out.println("Watching " + collection.getNamespace());
        List<Bson> pipeline = Collections.singletonList(Aggregates.match(filter));
        collection.watch(pipeline)
                  .fullDocument(FullDocument.UPDATE_LOOKUP)
                  .forEach((Consumer<ChangeStreamDocument<?>>) doc -> System.out.println(
                          doc.getClusterTime() + " => " + doc.getFullDocument()));
    }

    private static MongoDatabase initMongoDB(String mongodbURI) {
        getLogger("org.mongodb.driver").setLevel(Level.SEVERE);
        CodecRegistry providers = fromProviders(PojoCodecProvider.builder().register("com.mongodb.models").build());
        CodecRegistry codecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(), providers);
        MongoClientOptions.Builder options = new MongoClientOptions.Builder().codecRegistry(codecRegistry);
        MongoClientURI uri = new MongoClientURI(mongodbURI, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase db = client.getDatabase("test");
        db.drop();
        db.createCollection("cart");
        db.createCollection("product", productJsonSchemaValidator());
        return db;
    }

    private static CreateCollectionOptions productJsonSchemaValidator() {
        return new CreateCollectionOptions().validationOptions(
                new ValidationOptions().validationAction(ValidationAction.ERROR).validator(BsonDocument.parse(jsonSchema)));
    }
}

In this example we have 5 beers to sell. Alice wants to buy 2 beers but we are not going to use the new MongoDB 4.0 multi-document transactions for this. We will observe in the change streams two operations : one creating the cart and one updating the stock at 2 different cluster times.

Then Alice adds 2 more beers in her cart and we are going to use a transaction this time. The result in the change stream will be 2 operations happening at the same cluster time.

Finally, she will try to order 2 extra beers but the jsonSchema validator will fail the product update and result in a rollback. We will not see anything in the change stream. Here is the Transaction.java source code:

// package and import

public class Transactions {

    private static MongoClient client;
    private static MongoCollection<Cart> cartCollection;
    private static MongoCollection<Product> productCollection;

    private final BigDecimal BEER_PRICE = BigDecimal.valueOf(3);
    private final String BEER_ID = "beer";

    private final Bson stockUpdate = inc("stock", -2);
    private final Bson filterId = eq("_id", BEER_ID);
    private final Bson filterAlice = eq("_id", "Alice");
    private final Bson matchBeer = elemMatch("items", eq("productId", "beer"));
    private final Bson incrementBeers = inc("items.$.quantity", 2);

    public static void main(String[] args) {
        initMongoDB(args[0]);
        new Transactions().demo();
    }

    private static void initMongoDB(String mongodbURI) {
        getLogger("org.mongodb.driver").setLevel(Level.SEVERE);
        CodecRegistry codecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(), fromProviders(
                PojoCodecProvider.builder().register("com.mongodb.models").build()));
        MongoClientOptions.Builder options = new MongoClientOptions.Builder().codecRegistry(codecRegistry);
        MongoClientURI uri = new MongoClientURI(mongodbURI, options);
        client = new MongoClient(uri);
        MongoDatabase db = client.getDatabase("test");
        cartCollection = db.getCollection("cart", Cart.class);
        productCollection = db.getCollection("product", Product.class);
    }

    private void demo() {
        clearCollections();
        insertProductBeer();
        printDatabaseState();
        System.out.println("#########  NO  TRANSACTION #########");
        System.out.println("Alice wants 2 beers.");
        System.out.println("We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.");
        System.out.println("The 2 actions are correlated but can not be executed on the same cluster time.");
        System.out.println("Any error blocking one operation could result in stock error or beer sale we don't own.");
        System.out.println("---------------------------------------------------------------------------");
        aliceWantsTwoBeers();
        sleep();
        removingBeersFromStock();
        System.out.println("####################################\n");
        printDatabaseState();
        sleep();
        System.out.println("\n######### WITH TRANSACTION #########");
        System.out.println("Alice wants 2 extra beers.");
        System.out.println("Now we can update the 2 collections simultaneously.");
        System.out.println("The 2 operations only happen when the transaction is committed.");
        System.out.println("---------------------------------------------------------------------------");
        aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback();
        sleep();
        System.out.println("\n######### WITH TRANSACTION #########");
        System.out.println("Alice wants 2 extra beers.");
        System.out.println("This time we do not have enough beers in stock so the transaction will rollback.");
        System.out.println("---------------------------------------------------------------------------");
        aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback();
        client.close();
    }

    private void aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback() {
        ClientSession session = client.startSession();
        try {
            session.startTransaction(TransactionOptions.builder().writeConcern(WriteConcern.MAJORITY).build());
            aliceWantsTwoExtraBeers(session);
            sleep();
            removingBeerFromStock(session);
            session.commitTransaction();
        } catch (MongoCommandException e) {
            session.abortTransaction();
            System.out.println("####### ROLLBACK TRANSACTION #######");
        } finally {
            session.close();
            System.out.println("####################################\n");
            printDatabaseState();
        }
    }

    private void removingBeersFromStock() {
        System.out.println("Trying to update beer stock : -2 beers.");
        try {
            productCollection.updateOne(filterId, stockUpdate);
        } catch (MongoCommandException e) {
            System.out.println("#####   MongoCommandException  #####");
            System.out.println("##### STOCK CANNOT BE NEGATIVE #####");
            throw e;
        }
    }

    private void removingBeerFromStock(ClientSession session) {
        System.out.println("Trying to update beer stock : -2 beers.");
        try {
            productCollection.updateOne(session, filterId, stockUpdate);
        } catch (MongoCommandException e) {
            System.out.println("#####   MongoCommandException  #####");
            System.out.println("##### STOCK CANNOT BE NEGATIVE #####");
            throw e;
        }
    }

    private void aliceWantsTwoBeers() {
        System.out.println("Alice adds 2 beers in her cart.");
        cartCollection.insertOne(new Cart("Alice", Collections.singletonList(new Cart.Item(BEER_ID, 2, BEER_PRICE))));
    }

    private void aliceWantsTwoExtraBeers(ClientSession session) {
        System.out.println("Updating Alice cart : adding 2 beers.");
        cartCollection.updateOne(session, and(filterAlice, matchBeer), incrementBeers);
    }

    private void insertProductBeer() {
        productCollection.insertOne(new Product(BEER_ID, 5, BEER_PRICE));
    }

    private void clearCollections() {
        productCollection.deleteMany(new BsonDocument());
        cartCollection.deleteMany(new BsonDocument());
    }

    private void printDatabaseState() {
        System.out.println("Database state:");
        printProducts(productCollection.find().into(new ArrayList<>()));
        printCarts(cartCollection.find().into(new ArrayList<>()));
        System.out.println();
    }

    private void printProducts(List<Product> products) {
        products.forEach(System.out::println);
    }

    private void printCarts(List<Cart> carts) {
        if (carts.isEmpty())
            System.out.println("No carts...");
        else
            carts.forEach(System.out::println);
    }

    private void sleep() {
        System.out.println("Sleeping 3 seconds...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.err.println("Oups...");
            e.printStackTrace();
        }
    }
}

Here is the console of the Change Stream :

$ ./change-streams.sh 

Watching test.cart
Watching test.product

Timestamp{value=6570052721557110786, seconds=1529709604, inc=2} => Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]}



Timestamp{value=6570052734442012673, seconds=1529709607, inc=1} => Product{id='beer', stock=3, price=3}






Timestamp{value=6570052764506783745, seconds=1529709614, inc=1} => Product{id='beer', stock=1, price=3}
Timestamp{value=6570052764506783745, seconds=1529709614, inc=1} => Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

As you can see here, we only get four operations because the two last operations were never committed to the database, and therefore the change stream has nothing to show.

You can also note that the two first cluster times are different because we did not use a transaction for the two first operations, and the two last operations share the same cluster time because we used the new MongoDB 4.0 multi-document transaction system, and thus they are atomic.

Here is the console of the Transaction java process that sum up everything I said earlier.

$ ./transactions.sh 
Database state:
Product{id='beer', stock=5, price=3}
No carts...

#########  NO  TRANSACTION #########
Alice wants 2 beers.
We have to create a cart in the 'cart' collection and update the stock in the 'product' collection.
The 2 actions are correlated but can not be executed on the same cluster time.
Any error blocking one operation could result in stock error or a sale of beer that we can’t fulfill as we have no stock.
---------------------------------------------------------------------------
Alice adds 2 beers in her cart.
Sleeping 3 seconds...
Trying to update beer stock : -2 beers.
####################################

Database state:
Product{id='beer', stock=3, price=3}
Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]}

Sleeping 3 seconds...

######### WITH TRANSACTION #########
Alice wants 2 extra beers.
Now we can update the 2 collections simultaneously.
The 2 operations only happen when the transaction is committed.
---------------------------------------------------------------------------
Updating Alice cart : adding 2 beers.
Sleeping 3 seconds...
Trying to update beer stock : -2 beers.
####################################

Database state:
Product{id='beer', stock=1, price=3}
Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

Sleeping 3 seconds...

######### WITH TRANSACTION #########
Alice wants 2 extra beers.
This time we do not have enough beers in stock so the transaction will rollback.
---------------------------------------------------------------------------
Updating Alice cart : adding 2 beers.
Sleeping 3 seconds...
Trying to update beer stock : -2 beers.
#####   MongoCommandException  #####
##### STOCK CANNOT BE NEGATIVE #####
####### ROLLBACK TRANSACTION #######
####################################

Database state:
Product{id='beer', stock=1, price=3}
Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}

Next Steps

Thanks for taking the time to read my post - I hope you found it useful and interesting. As a reminder, all the code is available on this Github repository for you to experiment.

If you are looking for a very simple way to get started with MongoDB, you can do that in just 5 clicks on our MongoDB Atlas database service in the cloud.

Also, multi-document ACID transactions is not the only new feature in MongoDB 4.0, so feel free to take a look at our free course on MongoDB University M040: New Features and Tools in MongoDB 4.0 and our guide to what’s new in MongoDB 4.0 where you can learn more about native type conversions, new visualization and analytics tools, and Kubernetes integration.