In the Loop with Hadoop - Take Advantage of Data Locality with mongo-hadoop
September 15, 2016 | Updated: December 8, 2016
Imagine that you’ve been tasked with a big question: count how many times each word occurs in the complete works of William Shakespeare. You’re doing this by hand, of course, as your forebears did, before the days of computing. You invite your friends. Cleverly, you divide the work among yourselves, with each participant being responsible for a particular range of his works: tragedies from A-F, comedies from K-Z, and so on. When everyone is done, you will merge the results into one complete view.
Your piece of the task is to count the words in The Tempest. Unfortunately, you don’t own your own copy of the play, and because you live in a monastic word-counting village, the nearest library is 200 km away! Now you think, it would have been a lot easier to count the words of a text I already had nearby.
That is the principle of data locality, one of the concepts that allow Spark and Hadoop jobs to run quickly. Each worker node in a Spark or Hadoop cluster is tasked with running computation on only a single slice of the overall input data, and typically this slice already resides physically on the worker node itself. Otherwise, a worker node has to request the data it is missing, which has to be sent over a network, likely increasing the amount of time the job will take to run.
Hadoop jobs commonly read input data from the Hadoop Distributed File System (HDFS), which splits files into logical blocks that are distributed among many data nodes. When we run a MapReduce job, or the equivalent in Hive, Pig, or Spark, these data nodes also become responsible for computation, and they can conveniently process those blocks which already reside on the node. However, what if we wanted to read input from somewhere else like MongoDB? MongoDB is not HDFS, so Hadoop data nodes will have to open a connection to the database. This is like going 200 km to to pick up a book! Is there any hope at all for preserving data locality?
When we have MongoDB deployed as a sharded cluster, there is! In the next few paragraphs, we’ll look into how this new feature works in mongo-hadoop 2.0.
A sharded collection in MongoDB is partitioned into multiple chunks, which are individually housed on any number of shards. When you use mongo-hadoop to read from MongoDB in this scenario, the Hadoop connector creates one InputSplit (a “slice” of input data) per chunk.
The key improvement in mongo-hadoop 2.0 rests on using the metadata that MongoDB maintains: the shard key range for every chunk, and what shard houses each chunk.
When using mongo-hadoop with a sharded cluster, the default splitting strategy is to use one chunk per InputSplit, which represents one “slice” on which a worker node does computation. Since InputSplits map directly to shard chunks, we can resolve a hostname for a shard that already has all the data needed to work with a given InputSplit.
However, we wouldn’t want to read directly from a shard, since MongoDB automatically moves chunks between shards to ensure a uniform distribution, and these moves can happen at any time.. This is why the correct way to read from a sharded cluster is through a mongos process, which ensures we read chunks from the correct shards.
The connector allows you to specify multiple addresses to mongos instances using the mongo.input.mongos_hosts setting.
hadoop jar … -Dmongo.input.mongos_hosts=mongodb://foo:27017,mongodb://…
If the connector is told to read from a sharded collection, and mongo.input.mongos_hosts is not blank, then the connector will try to redirect reads for an InputSplit to a mongos instance that’s running on the same host.
This means that we can accomplish local reads by putting a DataNode process (for Hadoop) or a Spark executor on the same node with a MongoDB shard and a mongos. In other words, we have an equal number of MongoDB shards and Hadoop or Spark worker nodes. We provide the addresses to all these mongos instances to the connector, so that the connector will automatically tag each InputSplit with the address of the mongos that lives together with the shard that holds the data for the split.
Keep in mind that you’ll need to set an appropriate read preference to be able to read from the shard server local to the mongos as well. A read preference mode of nearest, for example, will select the server with the shortest ping time (i.e. the server local to the machine the task is running on), regardless of whether the shard server is a primary or secondary in a replica set.
We have always recommended setting up a mongos instance on the same machine as a Hadoop worker when reading from a sharded collection, since reading from a local mongos is a single network hop. Now with mongo-hadoop 2.0, you can achieve data locality and faster processing by co-locating your MongoDB shards with your Hadoop/Spark worker nodes (see diagram above). If you’ve experienced high latency reading MongoDB from Hadoop, we encourage you give this new feature a try!