Tutorial for Operationalizing Spark with MongoDB

Matt Kalan

#Technical

Update: August 4th 2016 Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark.

Introduction

If you are familiar with Hadoop, you know that Hive, Pig, Map-reduce, and more recently Spark can use HDFS as inputs and outputs for your Hadoop jobs. What you might not know is that you can just as easily use MongoDB as an input or output for Hadoop jobs. I showed this last time with the MongoDB Connector for Hadoop using Spark’s RDD API, but this time I’m going to show it using the open source spark-mongodb package from Stratio, which takes the approach of using Spark’s DataFrames, and provides several advantages we will explore.

One of the enormous benefits of using MongoDB for your datasets is the ability to define secondary indexes to read any slice with millisecond latency into your processing jobs. As a result, instead of scanning data on HDFS or in a primary key-value database like HBase, you can pull just the slice of data you need by any dimension, and then apply the distributed processing & analytics you want in much less time, making a bridge between your operational datasets and your heavy analytical workloads.

Likewise, after you analyze the data, you can output it to MongoDB, for low latency reporting and for backing real-time operational applications that often run the business. In this way, you can deliberately choose whether you want the storage for your data to be HDFS (without indexes) or MongoDB (with secondary indexes) for each dataset. You can make this decision based on how you are going to use the dataset (low latency user interaction or high latency scanning of data). Unfortunately, many people start by storing all their data in an HDFS-only data lake, only later to discover they don’t have the real-time access their business demands.

Tutorial

I am going to walk through a basic example of using MongoDB as an input and output for Spark jobs using the Spark Dataframes API and the open source spark-mongodb connector by Stratio, included in the standard Spark-packages.org community site. You can read more about Stratio’s use of MongoDB and Spark in a recent Leaf in the Wild blog post.

This example takes 1-minute time series intervals of stock prices with the opening (first) price, high (max), low (min), and closing (last) price of each time interval and turns them into 5 minute intervals (called OHLC bars). One might imagine a more typical example is that you record this time series data in MongoDB for real-time purposes but then potentially run offline analytical models in another environment. Of course the models would be way more complicated – this is just a “Hello World” level example. I chose OHLC bars because the data was easy to find (normally, this would be one price at a point in time).

Installation

I started from the Spark-packages site, which led me to the directions at spark-mongodb github repo and the About page containing the dependencies. Below, I used the versions of each dependency in the About page in the interest of time (and not risking running into issues with newer versions), but I am not aware of issues with later versions of those dependencies.

Steps to set up environment:

  1. Download spark-mongodb connector - the latest spark-mongodb Jar (currently v0.8.7) can be downloaded from the Spark-packages page under Releases.

  2. Download Hadoop - many think of Spark originally as separate from Hadoop, but all the Spark downloads seem to require it. I downloaded the Hadoop 2.6.0 binaries from the Hadoop Releases page.
  3. Configure Hadoop- I used the instructions for a single-node standalone cluster for this simple example to show the usage. I had Java installed already so I set JAVA_HOME and also added the HADOOP_PREFIX environment variable to hadoop-env.sh.
  4. Download Spark - I downloaded Spark v1.4.0 from the Downloads page pre-built for Hadoop 2.6 or later. It comes with Scala 2.10 so does not need a separate install for that dependency.
  5. Download the MongoDB Driver Jars - I had an issue running Spark-shell with --packages (related to cached local dependencies not being updated) so I ran it with --Jars and therefore I needed to download these jars from the Maven.org repository. These are 3 jars for the MongoDB Scala driver (called Casbah) and 1 for the Java driver underneath the Scala driver. The latest ones would likely work but I used these listed dependencies:
  6. Download MongoDB - there are a few different ways to install MongoDB, either via our Cloud Manager cloud-hosted management suite, or directly downloading the Community or Enterprise Advanced versions for your OS platform. You should run the latest 3.0.x version (3.1.x are development releases).

  7. Installing MongoDB - follow install directions for your OS in the Install Guides section. It is available via package managers on many platforms
  8. Start MongoDB - the steps above tell you how to start the mongod process, but if not:
    • Directions for Cloud Manager
    • Brief directions from command-line: create a data directory, default is /data/db. The Mongod process is in the install directory /bin. To start on the default port and default data directory, you can just run ./mongod to start the server in that shell window (unless you run --fork, it will sit running in that window waiting for connections to it).
  9. Download data - I Googled for some sample pricing data and found these 1 minute bars from this site under the Equities heading, 1-Min bar. I named the file equities-msft-minute-bars-2009.csv
  10. Load the data into MongoDB - you can use the mongoimport command from /bin to load the data into the MongoDB database mongoimport equities-msft-minute-bars-2009.csv --type csv --headerline -d marketdata -c minbars
  11. View data in MongoDB - you can start the Mongo shell (uses Javascript) with the command /bin/mongo to see the format of minute bars data
    use marketdata			//switches to the marketdata DB
    db.minbars.findOne()		//shows one sample document from the collection 
    {
    	"_id" : ObjectId("55fc881eb8c8b6db875e13fe"),
    	"Symbol" : "MSFT",
    	"Timestamp" : "2009-08-24 09:31",
    	"Day" : 24,
    	"Open" : 24.32,
    	"High" : 24.33,
    	"Low" : 24.28,
    	"Close" : 24.3,
    	"Volume" : 207651
    }
    >db.minbars.find().sort({Timestamp: 1})		//this show the first 20 records sorted - note they are every minute
  12. Run Spark’s Scala shell - you can start Spark’s Scala interactive shell with this command from /bin and changing the path to each Jar appropriately
    $ ./spark-shell --jars <path-to-jars>/spark-mongodb-core-0.8.7.jar,<path-to-jars>/casbah-commons_2.10-2.8.0.jar,<path-to-jars>/casbah-core_2.10-2.8.0.jar,<path-to-jars>/casbah-query_2.10-2.8.0.jar,<path-to-jars>/mongo-java-driver-2.13.0.jar

Representing a MongoDB collection as a DataFrame

If you have worked with DataFrames before, this should be very similar to how you would set up reading from any other data source. It basically creates a configuration object, a SQL parser object, and then the DataFrame representing the collection in 1-minute windows in MongoDB to read from.

This is for running from the interactive Spark-shell we started in the the last step of the Installation steps above (so you can play around with the data too). The following code is also in my repository in Github.

//Import the relevant packages and classes
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.provider._
import com.stratio.provider.mongodb._
import com.stratio.provider.mongodb.schema._
import com.stratio.provider.mongodb.writer._
import org.apache.spark.sql.hive.HiveContext
import MongodbConfig._
<p>//Configure which database and collection to read from, with optional parameters too
val mcInputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "marketdata", Collection -> "minbars", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal))
val readConfig = mcInputBuilder.build()</p>
<p>//HiveContext uses Hive's SQL parser with a superset of features of SQLContext so I used that one
//	See http://spark.apache.org/docs/1.4.0/sql-programming-guide.html#starting-point-sqlcontext for more info
val sqlContext = new HiveContext(sc)			//sc is already defined as a SparkContext by the shell
val dfOneMin = sqlContext.fromMongoDB(readConfig) 	//set up the MongoDB collection to read from as a DataFrame
dfOneMin.registerTempTable("minbars")			//make the table minbars available to the SQL expressions later

Querying Results and Saving to MongoDB

This builds on the packages and variables run above to do a sub-query on the 1-minute bars in the table minbars (backed by a DataFrame coming from a collection in MongoDB), then queries the results to create 5-minute bars, then saves in a DataFrame, and finally writes that DataFrame to MongoDB.

//This applies a SQL windowing functions to partition the 1-minute bars into 5-minute windows
//	and then selects the open, high, low, & close price within each 5 minute window
val dfFiveMinForMonth = sqlContext.sql(
"""
SELECT m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close
FROM
(SELECT
<pre><code>Symbol,
FIRST_VALUE(Timestamp)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as OpenTime,

LAST_VALUE(Timestamp)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as CloseTime,

FIRST_VALUE(Open)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as Open,
MAX(High)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as High,

MIN(Low)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as Low,
LAST_VALUE(Close)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as Close

FROM minbars) as m WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4""" )

//Configure which table we want to write to in MongoDB val fiveMinOutputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "marketdata", Collection -> "fiveMinBars", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitKey -> "_id", SplitSize -> 8)) val writeConfig = fiveMinOutputBuilder.build()

//Write the data to MongoDB - because of Spark's just-in-time execution, this actually triggers running the query to read from the 1-minute bars table in MongoDB and then writing to the 5-minute bars table in MongoDB dfFiveMinForMonth.saveToMongodb(writeConfig)

You can see in the Mongo shell that the data was loaded into MongoDB. The timestamps are every 5 minutes now as we aggregated them up from 1 minute.

$ ./mongo
<blockquote>
<p>use marketdata
db.fiveMinBars.find().sort({Timestamp: 1})	
{ "_id" : ObjectId("560a98eed4c677ae8b31d6e9"), "High" : 24.42, "Close" : 24.42, "Timestamp" : "2009-08-24 09:30", "Symbol" : "MSFT", "Open" : 24.41, "Low" : 24.28 }
{ "_id" : ObjectId("560a98eed4c677ae8b31d7b0"), "High" : 24.49, "Close" : 24.48, "Timestamp" : "2009-08-24 09:35", "Symbol" : "MSFT", "Open" : 24.41, "Low" : 24.38 }
{ "_id" : ObjectId("560a98eed4c677ae8b31d817"), "High" : 24.56, "Close" : 24.56, "Timestamp" : "2009-08-24 09:40", "Symbol" : "MSFT", "Open" : 24.48, "Low" : 24.47 }
{ "_id" : ObjectId("560a98eed4c677ae8b31d8db"), "High" : 24.6, "Close" : 24.51, "Timestamp" : "2009-08-24 09:45", "Symbol" : "MSFT", "Open" : 24.56, "Low" : 24.49 }
{ "_id" : ObjectId("560a98eed4c677ae8b31d8dd"), "High" : 24.52, "Close" : 24.48, "Timestamp" : "2009-08-24 09:50", "Symbol" : "MSFT", "Open" : 24.5, "Low" : 24.47 }

Running Spark on any slice of data

With secondary indexes on your data in MongoDB, when the Spark-mongodb connector pushes filter criteria (i.e. WHERE clauses) down to the database, you can run Spark queries on any slice of your data WITHOUT table scans. Contrast that with running a Spark job on data in HDFS or HBase, with filter criteria on anything other than the primary key, where all of your data would be scanned. You get the best of both worlds: the power of Spark running only on the slice of data you care about via indexing in MongoDB and therefore at the highest possible performance.

Here is a query similar to the read example above, creating 5-minute bars from 1-minute bars, but only for the month of July.

val dfFiveMinForMonth = sqlContext.sql(
"""
SELECT m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close
FROM
(SELECT
<pre><code>Symbol,
FIRST_VALUE(Timestamp)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as OpenTime,

LAST_VALUE(Timestamp)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as CloseTime,

FIRST_VALUE(Open)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as Open,
MAX(High)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as High,

MIN(Low)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as Low,
LAST_VALUE(Close)
OVER (
        PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
        ORDER BY Timestamp)

as Close

FROM minbars WHERE Timestamp >= '2010-07-01' AND Timestamp < '2010-08-01') as m WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4""" )

You can turn on the MongoDB profiler and see that the query is actually running on MongoDB with the filter criteria applied to the index (the first 100 results are returned on the cursor on this initial query).

$./mongo
<blockquote>
<p>use marketdata
db.system.profile.find({op:"query", ns:"marketdata.minbars"}).sort({ts:-1}).limit(1).pretty()
{
"op" : "query",
"ns" : "marketdata.minbars",
"query" : {
"Timestamp" : {
"$gte" : "2010-07-01",
"$lt" : "2010-08-01"
}
},
...
"inputStage" : {
"stage" : "IXSCAN",
"nReturned" : NumberInt(101),
"executionTimeMillisEstimate" : NumberInt(0), //rounded to 0 ms...
...
"keyPattern" : {
"Timestamp" : NumberInt(1)
},
"indexName" : "Timestamp_1",
"isMultiKey" : false,
"direction" : "forward",
"indexBounds" : {
"Timestamp" : [
"["2010-07-01", "2010-08-01")"
]
},
"keysExamined" : NumberInt(101),
…
}
}
},
...
}

Summary

This demonstrates how easily the power of Spark can be combined with the power of MongoDB for the operational requirements of analytics and a data lake environment. The most common architectural patterns for using MongoDB:

  1. MongoDB as application DB - the data resides in MongoDB as an operational system and is processed by Spark on-demand via a DataFrame
  2. MongoDB as storage in data lake - for data that you want to analyze with Spark and especially over which you want to query for slices of your data (where indexes are enormously helpful), you would store datasets of your data lake in MongoDB and run Spark on those slices. You still might store other datasets on HDFS in which scanning data is sufficient.
  3. Results operationalized in MongoDB - after you run analytics in Spark, you often want the results to be distributed to end users via reporting or operational applications. With ad hoc indexing, dynamic schema, and easy scaling, MongoDB is the best location to operationalize your analysis from Spark.

These benefits are true in any data lake environment, and apply to scenarios where you are using Hive, Pig, or any other MapReduce-based processing. etc. With Spark increasingly seen as a key piece of Hadoop going forward, I have highlighted how easy it is for MongoDB to interact with and/or be a part of your data lake.


Learn more about MongoDB and Apache Spark by reading our guide below:

Turning Analytics into Real-Time Action


About the Author - Matt Kalan

Matt Kalan is a Sr. Solution Architect at MongoDB with extensive experience helping more than 500 customers across industries solve business problems with technology. His particular focus these days is on guiding enterprises in maximizing the business value of enterprise data management (EDM) through all the noise in a fast-moving market. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.