Hadoop Streaming Support for MongoDB

MongoDB

#Releases

MongoDB has some native data processing tools, such as the built-in Javascript-oriented MapReduce framework, and a new Aggregation Framework in MongoDB v2.2. That said, there will always be a need to decouple persistance and computational layers when working with Big Data.

Enter MongoDB+Hadoop: an adapter that allows Apache’s Hadoop platform to integrate with MongoDB.

Using this adapter, it is possible to use MongoDB as a real-time datastore for your application while shifting large aggregation, batch processing, and ETL workloads to a platform better suited for the task.

          

Well, the engineers at 10gen have taken it one step further with the introduction of the streaming assembly for Mongo-Hadoop.

What does all that mean?

The streaming assembly lets you write MapReduce jobs in languages like Python, Ruby, and JavaScript instead of Java, making it easy for developers that are familiar with MongoDB and popular dynamic programing languages to leverage the power of Hadoop.

                    

It works like this:

Once a developer has Java installed and Hadoop ready to rock they download and build the adapter. With the adapter built, you compile the streaming assembly, load some data into Mongo, and get down to writing some MapReduce jobs.

The assembly streams data from MongoDB into Hadoop and back out again, running it through the mappers and reducers defined in a language you feel at home with. Cool right?

Ruby support was recently added and is particularly easy to get started with. Lets take a look at an example where we analyze twitter data.

Import some data into MongoDB from twitter:

This script curls the twitter status stream and and pipes the json into mongodb using mongoimport. The mongoimport binary has a couple of flags: “-d” which specifies the database “twitter” and -c which specifies the collection “in”.

Next, write a Mapper and save it in a file called mapper.rb:

#!/usr/bin/env ruby
require 'mongo-hadoop'
<p>MongoHadoop.map do |document|
{ :_id => document['user']['time_zone'], :count => 1 }
end

The mapper needs to call the MongoHadoop.map function and passes it a block. This block takes an argument “docuement” and emits a hash containing the user’s timezone and a count of 1.

Now, write a Reducer and save it in a file called reducer.rb:

#!/usr/bin/env ruby
require 'mongo-hadoop'
<p>MongoHadoop.reduce do |key, values|
count = sum = 0</p>
<p>values.each do |value|
count += 1
sum += value['num']
end</p>
<p>{ :_id => key, :average => sum / count }
end

The reducer calls the MongoHadoop.reduce function and passes it a block. This block takes two parameters, a key and an array of values for that key, reduces the values into a single aggregate and emits a hash with the same key and the newly reduced value.

To run it all, create a shell script that executes hadoop with the streaming assembly jar and tells it how to find the mapper and reducer files as well as where to retrieve and store the data:

hadoop jar mongo-hadoop-streaming-assembly*.jar -mapper mapper.rb -reducer reducer.rb -inputURI mongodb://127.0.0.1/twitter.in -outputURI mongodb://127.0.0.1/twitter.out

Make them all executable by running chmod +x on the all the scripts and run twit.sh to have hadoop process the job.