Pitfalls and Workarounds for Tailing the Oplog on a MongoDB Sharded Cluster

Henrik Ingo


This article was accurate at the time of its publication on 9 June 2015. As we continue to evolve our products and features certain technologies change. With regards to this post, the information about capped collections and sharding is no longer current. For a more up to date article, please see a newer Introduction to Change Streams post.


2 months ago I wrote about how you can tail the oplog also on a sharded cluster, and filter out internal inserts and deletes arising from the balancer process.

After it was published I have received more feedback on the topic, pointing out two scenarios which are still problematic for applications that need to tail the oplog. One actually applies also to non-sharded clusters, the second is still a sharding related issue. Both are perhaps a bit more obscure than what was discussed in the first blog post, but still very real.

Failovers and rollbacks

The first issue occurs due to failovers. Say you're tailing the oplog from a primary, which experiences some network issue which causes another node to be elected as the new primary, while the old primary eventually steps down. In this situation it is possible that the process tailing the oplog would have read some events that weren't actually replicated to another node yet. Then, when a new primary has been elected, these events aren't actually part of the current state of the database. Essentially, they have never happened - still the process tailing the oplog thinks they did.

(Note that when the old primary at some point wants to re-join the replica set, it will first have to rollback the events that weren't replicated and aren't part of the database anymore.)

The below picture illustrates this sequence of events. Note that events D and E do not exist in the database end state, but the Observer believes they do.

Fortunately, there are several solutions you can use to get a correct read also in the face of failovers and rollbacks:

A good thing with failovers is that they will force any client connection to be closed, and therefore the clients have to reconnect and re-discover the primary in the replica set. As part of this process one could also do something to prevent the above described unwanted situation.

  • A simple solution that probably would be useful to applications like Meteor, is simply to reload the data model as if the application was restarted, then continue tailing the oplog on the new primary as usual. Only thing to worry about here is that this doesn't cause a spike of queries when all apps suddenly need to issue a lot of queries to reload their data model. I could think of various ways to try to mitigate that, but that will be out of scope for this blog post.
  • ETL and replication systems will typically have some internal buffers containing the events to be replicated. In many cases it might be sufficient to simply stop replication, check the buffer against the oplog on the new primary and if necessary remove any operations that appear to have disappeared in the failover. If the number of events disappeared (ie rolled back) is larger than what exists in the buffer of the ETL/replication tool, then it should simply stop with an error and let the user fix and restart the situation. Note that the buffer could be increased on purpose to minimize the likelihood of that ever happening.

Finally a completely different approach would be to tail the oplogs of a majority or even all nodes in a replica set. Since the pair of the ts & h fields uniquely identifies each transaction, it is possible to easily merge the results from each oplog on the application side so that the "output" of the tailing thread are the events that have been returned by at least a majority of MongoDB nodes. In this approach you don't need to care about whether a node is a primary or secondary, you just tail the oplog of all of them and all events that are returned by a majority of oplogs are considered valid. If you receive events that do not exist in a majority of the oplogs, such events are skipped and discarded.

At MongoDB we are planning to improve the user experience for how to receive change notifications by way of tailing the oplog. One improvement would be to encapsulate one or some of the above techniques to be handled transparently by a library (such as the MongoDB connector). Yet another future solution will be SERVER-18022, which will allow to read data - in this case the oplog - from a snapshot that reflects the majority-committed state of the cluster.

Updates to orphan documents in a sharded cluster

In a sharded cluster, orphan documents are documents that exist on a database node, even if according to the shard key and current chunk distribution, the document really should be on another node at this point in time. (The current chunk distribution is stored on the config servers, in the config.chunks collection.)

Even if orphan documents - according to their very definition - shouldn't exist, they can appear and are harmless. For example they can appear due to an interrupted chunk migration: documents were inserted into a new shard, but for some reason not deleted from the old one.

In most cases MongoDB will correctly handle their existence. For example, if you connect to a sharded database via mongos, and do a find(), then the mongod process will filter out from the result set any orphan documents it may encounter. (Maybe the same document is returned by the other mongod, where it's existence is valid according to the current chunk distribution.) On the other hand, if you connect directly to the replica set and do the same find(), you will be able to see the orphan document being there. You can even insert() a document with an out-of-range shard key value into the node, to artificially create an orphan document for yourself.

One case where orphan documents unfortunately are currently not detected and filtered out is a multi-update:

   { age: { $gte: 65 } },
   { $set: { seniorCitizen: true } },
   { multi: true }

If such a multi-update hits an orphan document, the orphan document will be updated, the update will be recorded in the oplog, and replicated. Hence, if you're tailing the oplog in a sharded cluster, you could see these updates that from a cluster-wide point of view are ghost updates - they never happened and shouldn't be visible to the outside.

Unfortunately I'm not aware of any generic and robust way to workaround this issue. For some applications you can minimize the risk of orphan documents ever appearing, by turning off the balancer process and distributing chunks manually:

Fundamentally, this is an issue that needs to be solved in the MongoDB code base. A multi-update should detect and skip orphan documents. As part of improving the user experience for change notification use cases, we will also have to solve this problem somehow. (Solutions are being discussed, but I won't go into details in this post, as my focus was more on listing solutions or workarounds that are currently possible to apply.)

If you’re interested in learning more about the architecture of MongoDB, download our guide: