GIANT Stories at MongoDB

Pitfalls and Workarounds for Tailing the Oplog on a MongoDB Sharded Cluster

Introduction

2 months ago I wrote about how you can tail the oplog also on a sharded cluster, and filter out internal inserts and deletes arising from the balancer process.

After it was published I have received more feedback on the topic, pointing out two scenarios which are still problematic for applications that need to tail the oplog. One actually applies also to non-sharded clusters, the second is still a sharding related issue. Both are perhaps a bit more obscure than what was discussed in the first blog post, but still very real.

Failovers and rollbacks

The first issue occurs due to failovers. Say you're tailing the oplog from a primary, which experiences some network issue which causes another node to be elected as the new primary, while the old primary eventually steps down. In this situation it is possible that the process tailing the oplog would have read some events that weren't actually replicated to another node yet. Then, when a new primary has been elected, these events aren't actually part of the current state of the database. Essentially, they have never happened - still the process tailing the oplog thinks they did.

(Note that when the old primary at some point wants to re-join the replica set, it will first have to rollback the events that weren't replicated and aren't part of the database anymore.)

The below picture illustrates this sequence of events. Note that events D and E do not exist in the database end state, but the Observer believes they do.

Fortunately, there are several solutions you can use to get a correct read also in the face of failovers and rollbacks:

A good thing with failovers is that they will force any client connection to be closed, and therefore the clients have to reconnect and re-discover the primary in the replica set. As part of this process one could also do something to prevent the above described unwanted situation.

  • A simple solution that probably would be useful to applications like Meteor, is simply to reload the data model as if the application was restarted, then continue tailing the oplog on the new primary as usual. Only thing to worry about here is that this doesn't cause a spike of queries when all apps suddenly need to issue a lot of queries to reload their data model. I could think of various ways to try to mitigate that, but that will be out of scope for this blog post.
  • ETL and replication systems will typically have some internal buffers containing the events to be replicated. In many cases it might be sufficient to simply stop replication, check the buffer against the oplog on the new primary and if necessary remove any operations that appear to have disappeared in the failover. If the number of events disappeared (ie rolled back) is larger than what exists in the buffer of the ETL/replication tool, then it should simply stop with an error and let the user fix and restart the situation. Note that the buffer could be increased on purpose to minimize the likelihood of that ever happening.

Finally a completely different approach would be to tail the oplogs of a majority or even all nodes in a replica set. Since the pair of the ts & h fields uniquely identifies each transaction, it is possible to easily merge the results from each oplog on the application side so that the "output" of the tailing thread are the events that have been returned by at least a majority of MongoDB nodes. In this approach you don't need to care about whether a node is a primary or secondary, you just tail the oplog of all of them and all events that are returned by a majority of oplogs are considered valid. If you receive events that do not exist in a majority of the oplogs, such events are skipped and discarded.

At MongoDB we are planning to improve the user experience for how to receive change notifications by way of tailing the oplog. One improvement would be to encapsulate one or some of the above techniques to be handled transparently by a library (such as the MongoDB connector). Yet another future solution will be SERVER-18022, which will allow to read data - in this case the oplog - from a snapshot that reflects the majority-committed state of the cluster.

Updates to orphan documents in a sharded cluster

In a sharded cluster, orphan documents are documents that exist on a database node, even if according to the shard key and current chunk distribution, the document really should be on another node at this point in time. (The current chunk distribution is stored on the config servers, in the config.chunks collection.)

Even if orphan documents - according to their very definition - shouldn't exist, they can appear and are harmless. For example they can appear due to an interrupted chunk migration: documents were inserted into a new shard, but for some reason not deleted from the old one.

In most cases MongoDB will correctly handle their existence. For example, if you connect to a sharded database via mongos, and do a find(), then the mongod process will filter out from the result set any orphan documents it may encounter. (Maybe the same document is returned by the other mongod, where it's existence is valid according to the current chunk distribution.) On the other hand, if you connect directly to the replica set and do the same find(), you will be able to see the orphan document being there. You can even insert() a document with an out-of-range shard key value into the node, to artificially create an orphan document for yourself.

One case where orphan documents unfortunately are currently not detected and filtered out is a multi-update:

db.people.update(
   { age: { $gte: 65 } },
   { $set: { seniorCitizen: true } },
   { multi: true }
)

If such a multi-update hits an orphan document, the orphan document will be updated, the update will be recorded in the oplog, and replicated. Hence, if you're tailing the oplog in a sharded cluster, you could see these updates that from a cluster-wide point of view are ghost updates - they never happened and shouldn't be visible to the outside.

Unfortunately I'm not aware of any generic and robust way to workaround this issue. For some applications you can minimize the risk of orphan documents ever appearing, by turning off the balancer process and distributing chunks manually:

Fundamentally, this is an issue that needs to be solved in the MongoDB code base. A multi-update should detect and skip orphan documents. As part of improving the user experience for change notification use cases, we will also have to solve this problem somehow. (Solutions are being discussed, but I won't go into details in this post, as my focus was more on listing solutions or workarounds that are currently possible to apply.)

If you’re interested in learning more about the architecture of MongoDB, download our guide:


Tailing the MongoDB Oplog on Sharded Clusters

Intro

Tailable cursors, and in particular tailing MongoDB’s oplog, is a popular feature with many uses, such as real-time notifications of all the changes to your database. A tailable cursor is conceptually the same as the Unix *"tail -f"* command. Once you've reached the end of the result set, the cursor will not be closed, rather it will continue to wait forever for new data and when it arrives, return that too.

Tailing the oplog is very simple for replica sets, but when it comes to sharded clusters things are a little more complex. In this post we explain how to tail MongoDB’s oplog in a sharded cluster. In my next post we'll review the pitfalls and workarounds for tailing the oplog on a MongoDB sharded cluster.

Why tail the oplog?

Tailable cursors can be used on capped collections and are often used for Publish-Subscribe type of data flows. In particular, MongoDB's Oplog, that we use internally for replication, is a capped collection and secondaries will use a tailable cursor to get the operations that are to be replicated.

Also 3rd party tools in the ETL or heterogeneous replication domain can read events from the MongoDB oplog. For example the Mongo Connector or the MongoDB ElasticSearch River do exactly that.

But which such a powerful interface, there's more we can do than just replication! Reactive programming has become the dominant paradigm especially in HTML5 / JavaScript applications. Several modern JavaScript frameworks will update the user interface immediately and automatically as you change some value in your data model.

Tailing a MongoDB collection, or the entire database by way of tailing the oplog, is a perfect match for such a programming model! It means the application server will be notified real-time of any changes happening in the entire database.

In fact, one wonderful JavaScript framework is already doing this: Meteor. They have a cool video demo on their site, check it out! This makes Meteor a full stack reactive platform: changes propagate automatically all the way from database to the UI.

Reading the oplog with a tailable cursor

Here's an example how to do a tailable cursor from the mongo shell:


shard01:PRIMARY> <b>c = db.oplog.rs.find( { fromMigrate : { $exists : false } } ).addOption(  DBQuery.Option.tailable ).addOption(DBQuery.Option.awaitData)</b>
{ "ts" : Timestamp(1422998530, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" } }
{ "ts" : Timestamp(1422998574, 1), "h" : NumberLong("-6781014703318499311"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 1, "data" : "hello" } }
{ "ts" : Timestamp(1422998579, 1), "h" : NumberLong("-217362260421471244"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 3, "data" : "hello" } }
{ "ts" : Timestamp(1422998584, 1), "h" : NumberLong("7215322058367374253"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 5, "data" : "hello" } }
shard01:PRIMARY> <b>c.hasNext()</b>
true
shard01:PRIMARY> <b>c.next()</b>
{
    "ts" : Timestamp(1423049506, 1),
    "h" : NumberLong("5775895302295493166"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 12,
        "data" : "hello"
    }
}
shard01:PRIMARY> <b>c.hasNext()</b>
false

As you can see, when used from the shell, the cursor will not wait forever, rather will timeout after a few seconds. Then you can use the hasNext() and next() methods to check if any new data has arrived. And it has!

You can of course apply any filter to find() to capture only the events you want. For example, this is what a tailing cursor from Meteor looks like:


meteor:PRIMARY> db.currentOp()
{
    "inprog" : [
        {
            "opid" : 345,
            "active" : true,
            "secs_running" : 4,
            "op" : "getmore",
            "ns" : "local.oplog.rs",
            "query" : {
                "ns" : {
                    "$regex" : "^meteor\\."
                },
                "$or" : [
                    {
                        "op" : {
                            "$in" : [
                                "i",
                                "u",
                                "d"
                            ]
                        }
                    },
                    {
                        "op" : "c",
                        "o.drop" : {
                            "$exists" : true
                        }
                    }
                ],
                "ts" : {
                    "$gt" : Timestamp(1422200128, 7)
                }
            },

Tailing the Oplog on sharded clusters

But what happens when you use sharding? Well, first of all you'll have to tail each oplog on each shard separately.

That's still doable, but there are more complications. In a sharded cluster the MongoDB balancer will occasionally be moving data from one shard to another. This means that on one shard you will see a bunch of deletes, and on the next one you'll simultaneously see a corresponding bunch of inserts. But these are purely a MongoDB internal matter. If you were tailing the oplog to capture changes in the database, most likely you wouldn't want to see these and might even be confused by these internal events. For example, a Meteor app tailing oplogs on a sharded cluster might mysteriously delete some data all of a sudden!

Let me illustrate. First let's setup a sharded cluster using the mlaunch utility:


<b>$ mlaunch --sharded 2 --replicaset</b>
launching: mongod on port 27018
launching: mongod on port 27019
launching: mongod on port 27020
launching: mongod on port 27021
launching: mongod on port 27022
launching: mongod on port 27023
launching: config server on port 27024
replica set 'shard01' initialized.
replica set 'shard02' initialized.
launching: mongos on port 27017
adding shards. can take up to 30 seconds...

Now I'll connect to the mongos, shard a collection and insert some data into it:


<b>$ mongo</b>
MongoDB shell version: 2.6.7
connecting to: test
<b>mongos> sh.enableSharding( "test" )</b>
{ "ok" : 1 }
<b>mongos> sh.shardCollection( "test.mycollection", { _id : 1 }, true )</b>
{ "collectionsharded" : "test.mycollection", "ok" : 1 }
<b>mongos> db.mycollection.insert( { _id : 1, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 3, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 5, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 7, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 9, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 11, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })

And if I connect to the mongod on shard01, we can see that all data is there. We can also see the insert events from the oplog:


<b>$ mongo --port 27018</b>
MongoDB shell version: 2.6.7
connecting to: 127.0.0.1:27018/test
<b>shard01:PRIMARY> show collections</b>
mycollection
system.indexes
<b>shard01:PRIMARY> db.mycollection.find()</b>
{ "_id" : 1, "data" : "hello" }
{ "_id" : 3, "data" : "hello" }
{ "_id" : 5, "data" : "hello" }
{ "_id" : 7, "data" : "hello" }
{ "_id" : 9, "data" : "hello" }
{ "_id" : 11, "data" : "hello" }
<b>shard01:PRIMARY> use local</b>
switched to db local
<b>shard01:PRIMARY> show collections</b>
me
oplog.rs
slaves
startup_log
system.indexes
system.replset
<b>shard01:PRIMARY> db.oplog.rs.find().pretty()</b>
{
    "ts" : Timestamp(1422998530, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "n",
    "ns" : "",
    "o" : {
        "msg" : "initiating set"
    }
}
{
    "ts" : Timestamp(1422998574, 1),
    "h" : NumberLong("-6781014703318499311"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 1,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998579, 1),
    "h" : NumberLong("-217362260421471244"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 3,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998584, 1),
    "h" : NumberLong("7215322058367374253"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 5,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998588, 1),
    "h" : NumberLong("-5372877897993278968"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 7,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998591, 1),
    "h" : NumberLong("-243188455606213719"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 9,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998597, 1),
    "h" : NumberLong("5040618552262309692"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 11,
        "data" : "hello"
    }
}

On shard02 there's so far nothing, because there's still so little data that the balancer didn't run. Let's split the data into 2 chunks, this will trigger a balancer round:


<b>mongos> sh.status()</b>
--- Sharding Status --- 
  sharding version: {
    "_id" : 1,
    "version" : 4,
    "minCompatibleVersion" : 4,
    "currentVersion" : 5,
    "clusterId" : ObjectId("54d13c0555c0347d23e33cdd")
}
  shards:
    {  "_id" : "shard01",  "host" : "shard01/hingo-sputnik:27018,hingo-sputnik:27019,hingo-sputnik:27020" }
    {  "_id" : "shard02",  "host" : "shard02/hingo-sputnik:27021,hingo-sputnik:27022,hingo-sputnik:27023" }
  databases:
    {  "_id" : "admin",  "partitioned" : false,  "primary" : "config" }
    {  "_id" : "test",  "partitioned" : true,  "primary" : "shard01" }
        test.mycollection
            shard key: { "_id" : 1 }
            chunks:
                shard01    1
            { "_id" : { "$minKey" : 1 } } -->> { "_id" : { "$maxKey" : 1 } } on : shard01 Timestamp(1, 0) 

<b>mongos> sh.splitAt( "test.mycollection", { _id : 6 } )</b>
{ "ok" : 1 }

<b>mongos> sh.status()</b>
--- Sharding Status --- 
  sharding version: {
    "_id" : 1,
    "version" : 4,
    "minCompatibleVersion" : 4,
    "currentVersion" : 5,
    "clusterId" : ObjectId("54d13c0555c0347d23e33cdd")
}
  shards:
    {  "_id" : "shard01",  "host" : "shard01/hingo-sputnik:27018,hingo-sputnik:27019,hingo-sputnik:27020" }
    {  "_id" : "shard02",  "host" : "shard02/hingo-sputnik:27021,hingo-sputnik:27022,hingo-sputnik:27023" }
  databases:
    {  "_id" : "admin",  "partitioned" : false,  "primary" : "config" }
    {  "_id" : "test",  "partitioned" : true,  "primary" : "shard01" }
        test.mycollection
            shard key: { "_id" : 1 }
            chunks:
                shard02    1
                shard01    1
            { "_id" : { "$minKey" : 1 } } -->> { "_id" : 6 } on : shard02 Timestamp(2, 0) 
            { "_id" : 6 } -->> { "_id" : { "$maxKey" : 1 } } on : shard01 Timestamp(2, 1) 

mongos> 

As you can see, the collection is split into 2 chunks now, and the balancer has done its job and spread them evenly across the shards. If we go back to shard01, we can see how half of the records disappeared ( { "op" : "d"} are deletions):


<b>shard01:PRIMARY> use test</b>
switched to db test
<b>shard01:PRIMARY> db.mycollection.find()</b>
{ "_id" : 7, "data" : "hello" }
{ "_id" : 9, "data" : "hello" }
{ "_id" : 11, "data" : "hello" }
<b>shard01:PRIMARY> use local</b>
switched to db local
<b>shard01:PRIMARY> db.oplog.rs.find().pretty()</b>
{
    "ts" : Timestamp(1422998530, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "n",
    "ns" : "",
    "o" : {
        "msg" : "initiating set"
    }
}
{
    "ts" : Timestamp(1422998574, 1),
    "h" : NumberLong("-6781014703318499311"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 1,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998579, 1),
    "h" : NumberLong("-217362260421471244"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 3,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998584, 1),
    "h" : NumberLong("7215322058367374253"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 5,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998588, 1),
    "h" : NumberLong("-5372877897993278968"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 7,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998591, 1),
    "h" : NumberLong("-243188455606213719"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 9,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998597, 1),
    "h" : NumberLong("5040618552262309692"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 11,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998892, 1),
    "h" : NumberLong("3056127588031004421"),
    "v" : 2,
    "op" : "d",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 1
    }
}
{
    "ts" : Timestamp(1422998892, 2),
    "h" : NumberLong("-7633416138502997855"),
    "v" : 2,
    "op" : "d",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 3
    }
}
{
    "ts" : Timestamp(1422998892, 3),
    "h" : NumberLong("1499304029305069766"),
    "v" : 2,
    "op" : "d",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 5
    }
}
shard01:PRIMARY> 

And on shard02 we can see the same records appearing:


<b>$ mongo --port 27021</b>
MongoDB shell version: 2.6.7
connecting to: 127.0.0.1:27021/test
<b>shard02:PRIMARY> db.mycollection.find()</b>
{ "_id" : 1, "data" : "hello" }
{ "_id" : 3, "data" : "hello" }
{ "_id" : 5, "data" : "hello" }
<b>shard02:PRIMARY> use local</b>
switched to db local
<b>shard02:PRIMARY> db.oplog.rs.find().pretty()</b>
{
    "ts" : Timestamp(1422998531, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "n",
    "ns" : "",
    "o" : {
        "msg" : "initiating set"
    }
}
{
    "ts" : Timestamp(1422998890, 1),
    "h" : NumberLong("-6780991630754185199"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.system.indexes",
    "fromMigrate" : true,
    "o" : {
        "v" : 1,
        "key" : {
            "_id" : 1
        },
        "name" : "_id_",
        "ns" : "test.mycollection"
    }
}
{
    "ts" : Timestamp(1422998890, 2),
    "h" : NumberLong("-165956952201849851"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 1,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998890, 3),
    "h" : NumberLong("-7432242710082771022"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 3,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998890, 4),
    "h" : NumberLong("6790671206092100026"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 5,
        "data" : "hello"
    }
}

If we again insert some more data...


<b>mongos> db.mycollection.insert( { _id : 2, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 4, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 6, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 8, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.insert( { _id : 10, data : "hello" } )</b>
WriteResult({ "nInserted" : 1 })
<b>mongos> db.mycollection.find()</b>
{ "_id" : 1, "data" : "hello" }
{ "_id" : 7, "data" : "hello" }
{ "_id" : 3, "data" : "hello" }
{ "_id" : 9, "data" : "hello" }
{ "_id" : 5, "data" : "hello" }
{ "_id" : 11, "data" : "hello" }
{ "_id" : 2, "data" : "hello" }
{ "_id" : 6, "data" : "hello" }
{ "_id" : 4, "data" : "hello" }
{ "_id" : 8, "data" : "hello" }
{ "_id" : 10, "data" : "hello" }

...then these inserts appear as expected on *shard01*...


<b>shard01:PRIMARY> use local</b>
switched to db local
<b>shard01:PRIMARY> db.oplog.rs.find().pretty()</b>

...beginning is the same as above, omitted for brevity ...

{
    "ts" : Timestamp(1422998892, 3),
    "h" : NumberLong("1499304029305069766"),
    "v" : 2,
    "op" : "d",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 5
    }
}
{
    "ts" : Timestamp(1422999422, 1),
    "h" : NumberLong("-6691556866108433789"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 6,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999426, 1),
    "h" : NumberLong("-3908881761176526422"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 8,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999429, 1),
    "h" : NumberLong("-4997431625184830993"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 10,
        "data" : "hello"
    }
}
shard01:PRIMARY> 

...and *shard02*:


shard02:PRIMARY> use local
switched to db local
shard02:PRIMARY> db.oplog.rs.find().pretty()
{
    "ts" : Timestamp(1422998531, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "n",
    "ns" : "",
    "o" : {
        "msg" : "initiating set"
    }
}
{
    "ts" : Timestamp(1422998890, 1),
    "h" : NumberLong("-6780991630754185199"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.system.indexes",
    "fromMigrate" : true,
    "o" : {
        "v" : 1,
        "key" : {
            "_id" : 1
        },
        "name" : "_id_",
        "ns" : "test.mycollection"
    }
}
{
    "ts" : Timestamp(1422998890, 2),
    "h" : NumberLong("-165956952201849851"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 1,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998890, 3),
    "h" : NumberLong("-7432242710082771022"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 3,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998890, 4),
    "h" : NumberLong("6790671206092100026"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 5,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999414, 1),
    "h" : NumberLong("8160426227798471967"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 2,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999419, 1),
    "h" : NumberLong("-3554656302824563522"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 4,
        "data" : "hello"
    }
}
shard02:PRIMARY>

Separating internal operations from application operations

So if an application like Meteor was reading the above, it would certainly be challenging to figure out what the end state of the data model is. If we simply combine the oplog events from both shards, it seems there has been these inserts and deletes:

Operation
_id
insert 1
insert 3
insert 5
insert 7
insert 9
insert 11
insert 1
insert 3
insert 5
delete 1
delete 3
delete 5
insert 2
insert 4
insert 6
insert 8
insert 10

So, given the above sequence, do _id's 1, 3 and 5 exist in the data or not?

Fortunately, it is possible to distinguish cluster-internal operations from application operations. You may have noticed that the operations caused by the migrations have a fromMigrate flag set:


{
    "ts" : Timestamp(1422998890, 2),
    "h" : NumberLong("-165956952201849851"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "fromMigrate" : true,
    "o" : {
        "_id" : 1,
        "data" : "hello"
    }
}

Since we are only interested in operations that actually alter the database state when taking the cluster as a whole, we can filter out everything where this flag is set. Note that the correct way is to use $exists, rather than false:


<b>shard01:PRIMARY> db.oplog.rs.find( { fromMigrate : false } ).pretty()
shard01:PRIMARY> db.oplog.rs.find( { fromMigrate : { $exists : false } } ).pretty()</b>
{
    "ts" : Timestamp(1422998530, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "n",
    "ns" : "",
    "o" : {
        "msg" : "initiating set"
    }
}
{
    "ts" : Timestamp(1422998574, 1),
    "h" : NumberLong("-6781014703318499311"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 1,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998579, 1),
    "h" : NumberLong("-217362260421471244"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 3,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998584, 1),
    "h" : NumberLong("7215322058367374253"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 5,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998588, 1),
    "h" : NumberLong("-5372877897993278968"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 7,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998591, 1),
    "h" : NumberLong("-243188455606213719"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 9,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422998597, 1),
    "h" : NumberLong("5040618552262309692"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 11,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999422, 1),
    "h" : NumberLong("-6691556866108433789"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 6,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999426, 1),
    "h" : NumberLong("-3908881761176526422"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 8,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999429, 1),
    "h" : NumberLong("-4997431625184830993"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 10,
        "data" : "hello"
    }
}
shard01:PRIMARY> 

And on shard02:


shard02:PRIMARY> db.oplog.rs.find( { fromMigrate : { $exists : false } } ).pretty()
{
    "ts" : Timestamp(1422998531, 1),
    "h" : NumberLong(0),
    "v" : 2,
    "op" : "n",
    "ns" : "",
    "o" : {
        "msg" : "initiating set"
    }
}
{
    "ts" : Timestamp(1422999414, 1),
    "h" : NumberLong("8160426227798471967"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 2,
        "data" : "hello"
    }
}
{
    "ts" : Timestamp(1422999419, 1),
    "h" : NumberLong("-3554656302824563522"),
    "v" : 2,
    "op" : "i",
    "ns" : "test.mycollection",
    "o" : {
        "_id" : 4,
        "data" : "hello"
    }
}
shard02:PRIMARY> 

Exactly what we want!

Continue to Part II on this Topic: Pitfalls and Workarounds for Tailing the Oplog on a MongoDB Sharded Cluster.

If you’re interested in learning more about the operational best practices of MongoDB, download our guide:

Download Ops Best Practices