Luke Lovett

2 results

In the Loop with Hadoop - Take Advantage of Data Locality with mongo-hadoop

Imagine that you’ve been tasked with a big question: count how many times each word occurs in the complete works of William Shakespeare. You’re doing this by hand, of course, as your forebears did, before the days of computing. You invite your friends. Cleverly, you divide the work among yourselves, with each participant being responsible for a particular range of his works: tragedies from A-F, comedies from K-Z, and so on. When everyone is done, you will merge the results into one complete view. Your piece of the task is to count the words in The Tempest . Unfortunately, you don’t own your own copy of the play, and because you live in a monastic word-counting village, the nearest library is 200 km away! Now you think, it would have been a lot easier to count the words of a text I already had nearby . That is the principle of data locality , one of the concepts that allow Spark and Hadoop jobs to run quickly. Each worker node in a Spark or Hadoop cluster is tasked with running computation on only a single slice of the overall input data, and typically this slice already resides physically on the worker node itself. Otherwise, a worker node has to request the data it is missing, which has to be sent over a network, likely increasing the amount of time the job will take to run. Hadoop jobs commonly read input data from the Hadoop Distributed File System (HDFS), which splits files into logical blocks that are distributed among many data nodes . When we run a MapReduce job, or the equivalent in Hive, Pig, or Spark, these data nodes also become responsible for computation, and they can conveniently process those blocks which already reside on the node. However, what if we wanted to read input from somewhere else like MongoDB? MongoDB is not HDFS, so Hadoop data nodes will have to open a connection to the database. This is like going 200 km to to pick up a book! Is there any hope at all for preserving data locality? When we have MongoDB deployed as a sharded cluster, there is! In the next few paragraphs, we’ll look into how this new feature works in mongo-hadoop 2.0. A sharded collection in MongoDB is partitioned into multiple chunks , which are individually housed on any number of shards . When you use mongo-hadoop to read from MongoDB in this scenario, the Hadoop connector creates one InputSplit (a “slice” of input data) per chunk. The key improvement in mongo-hadoop 2.0 rests on using the metadata that MongoDB maintains: the shard key range for every chunk, and what shard houses each chunk. When using mongo-hadoop with a sharded cluster, the default splitting strategy is to use one chunk per InputSplit, which represents one “slice” on which a worker node does computation. Since InputSplits map directly to shard chunks, we can resolve a hostname for a shard that already has all the data needed to work with a given InputSplit. However, we wouldn’t want to read directly from a shard, since MongoDB automatically moves chunks between shards to ensure a uniform distribution, and these moves can happen at any time.. This is why the correct way to read from a sharded cluster is through a mongos process, which ensures we read chunks from the correct shards. The connector allows you to specify multiple addresses to mongos instances using the mongo.input.mongos_hosts setting. hadoop jar … -Dmongo.input.mongos_hosts=mongodb://foo:27017,mongodb://… If the connector is told to read from a sharded collection, and mongo.input.mongos_hosts is not blank, then the connector will try to redirect reads for an InputSplit to a mongos instance that’s running on the same host. This means that we can accomplish local reads by putting a DataNode process (for Hadoop) or a Spark executor on the same node with a MongoDB shard and a mongos. In other words, we have an equal number of MongoDB shards and Hadoop or Spark worker nodes. We provide the addresses to all these mongos instances to the connector, so that the connector will automatically tag each InputSplit with the address of the mongos that lives together with the shard that holds the data for the split. Keep in mind that you’ll need to set an appropriate read preference to be able to read from the shard server local to the mongos as well. A read preference mode of nearest , for example, will select the server with the shortest ping time (i.e. the server local to the machine the task is running on), regardless of whether the shard server is a primary or secondary in a replica set. We have always recommended setting up a mongos instance on the same machine as a Hadoop worker when reading from a sharded collection, since reading from a local mongos is a single network hop. Now with mongo-hadoop 2.0, you can achieve data locality and faster processing by co-locating your MongoDB shards with your Hadoop/Spark worker nodes (see diagram above). If you’ve experienced high latency reading MongoDB from Hadoop, we encourage you give this new feature a try! Download Unlocking Operational Intelligence Guide

September 15, 2016

MongoDB Connector for Hadoop 1.4

It’s been almost a year since the last feature release of the MongoDB Connector for Hadoop. We’re very pleased to announce the release of 1.4, which contains a few excellent improvements and many bugfixes. In this article, I’d like to focus on one improvement in particular that really improves the performance of the connector: support for Hadoop’s speculative execution . What is Speculative Execution? When processing data in a distributed system like Hadoop, it’s possible for some of that processing workload to be bottlenecked by one or two nodes that are especially slow. There could be many reasons why the nodes are slow such as software misconfiguration or hardware failure. Perhaps the network around these nodes is especially saturated. Whatever the cause, the Hadoop job cannot complete until these slow nodes catch up on their work. Speculative execution allows Hadoop to reschedule some tasks at the end of a job redundantly. This means that several nodes may end up with exactly the same segment of data to process, even though only one of these nodes needs to complete that work successfully in order for the job to finish. It becomes a race to the finish: if one or two of the nodes in the Hadoop cluster are slower than the rest, then any other faster node, given the same task, may complete it first. In this case, the tasks scheduled on the slower nodes are cancelled, and the job completes earlier than it would if it had to wait on the slower nodes. This seems like a clever optimization so far, but it’s one that caused some serious problems when writing to MongoDB from the Hadoop connector. Let’s explore what was causing this problem a little deeper... The Problem MongoOutputFormat creates a MongoRecordWriter , which sends data as soon as it receives it straight to MongoDB using the MongoDB Java Driver. Recall that allowing speculative execution on a Hadoop cluster allows the ResourceManager to schedule tasks redundantly. If all these redundant tasks are running at once, and it is very likely that they are, then each one of them is writing to MongoDB, regardless of whether it ultimately will complete. In cases where the job emits documents that have an _id field already, this can result in DuplicateKeyErrors. One of the redundant tasks finished the race first, but the losers will still try to insert documents with IDs that already exist because they were inserted by the winner! If the job emits documents that don’t have an _id, then the Java Driver adds them automatically. If no duplicate IDs are generated by the driver, we dodge the DuplicateKeyErrors, but now we have duplicate documents in our database with different IDs! Either way, this is not desirable. Previously, we recommended that users turn off speculative execution. This avoids this nasty behavior but shuts off a useful feature. I took a look at other Hadoop connectors , and they all face similar problems and make the same recommendation. The problem seemed endemic to writing to a live-running system. Is there any case where Hadoop speculative execution can avoid duplicating records in the output? The answer is yes, and when writing output to a file, the solution is pretty easy. Each task creates a scratch directory where it can write temporary files. As each task begins to generate output, this output gets written to a temporary file. Another piece of the puzzle is a class known as an OutputCommitter . The OutputCommitter defines methods that are called when a job or a task is about to start, has been aborted, or completed. Usually, each OutputFormat defines a type of OutputCommitter to use. If you were using FileOutputFormat , for example, you’re also using the FileOutputCommitter . The FileOutputCommitter just deletes all the temporary directories immediately for tasks that were aborted. In the case of our slow nodes, their tasks were rescheduled on other faster nodes and finished before the slow nodes did, so now the slow nodes are cleaned up. The tasks that finished on the fast nodes have their temporary files collected together into a single directory that represents the output for the entire job. Since the output comes only from tasks that completed successfully, there are no duplicate records from the output. We took a similar approach supporting speculative execution for writing to MongoDB. MongoRecordWriter, instead of writing directly to MongoDB, writes instead to a temporary directory. Each insert or update operation has a special serialized format that gets written. When a task is aborted, these files are removed. When a task completes, the MongoOutputCommitter reads the file and performs each operation. This is sufficient for allowing the Hadoop connector to play nice with speculative execution. At this point, however, we can go a step further to allow another optimization. Another Optimization For almost a year now, MongoDB drivers have supported a bulk operations API. MongoDB server versions 2.6 and later support bulk operations, which tend to complete much faster than the same operations sent serially. The Hadoop connector has never taken advantage of the bulk API. However, now that each Task produces a frozen batch of operations already in the form of a temporary file, it’s fairly straightforward to use the bulk API to send these operations to MongoDB. Using the bulk API in a Hadoop job, which can process and produce terabytes or even petabytes of documents, makes an enormous positive impact on performance. We did our best to measure exactly what kind of benefit this provides. We wrote an “identity” MapReduce job (i.e., a job whose output is identical to the input and has no processing in the middle). The input for the job was a large BSON file, akin to what might be produced by the “ mongodump ” program. We compared the performance of the “identity” job before and after the MongoOutputCommitter and bulk write changes on a 5-node Hadoop cluster running CDH4. The input to the job was the “enron emails” data set , which comprises 501,513 documents, each one about 4k in size. Before the MongoOutputCommitter and bulk write changes, the Hadoop job took 147 minutes to complete. Of course, some of this measurement represents time spent moving splits between nodes in the Hadoop cluster, but much of that time was Hadoop connector overhead since there was no processing required in this Job. After the bulk write changes, the same job took 6 minutes! If we assume that most of the remaining 6 minutes of execution is connector overhead as well (moving data to MongoDB still has to take some amount of time), then that’s almost a 96% improvement! Not only have we fixed a bug, but we’ve made a tremendous improvement on the performance of the connector in a pretty common use case (i.e., using MongoDB as a sink for Hadoop jobs). We’re hoping that this improvement and others in 1.4 make our users very happy, and that our users continue to be a great supportive community around this project. To take advantage of the improvement discussed here and many others, download version 1.4.0 of the MongoDB Hadoop Connector by adding the following to your pom.xml : org.mongodb.mongo-hadoop mongo-hadoop-core 1.4.0 Or if you use Gradle , try this: compile 'org.mongodb.mongo-hadoop:mongo-hadoop-core:1.4.0' You can also visit the project’s home on Github and download the jars directly from the “releases” page . Finally, you can read all the release notes here . Thank you. The JVM Drivers Team Want more MongoDB? Learn how Apache Spark and MongoDB work together to turn analytics into real-time. Learn more about Spark and MongoDB

July 9, 2015