Using MongoDB with Hadoop & Spark: Part 3 - Spark Example & Key Takeaways

< View all blog posts
Matt Kalan
February 19, 2015
Category: Business

Update: August 4th 2016 Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark.

Welcome to the final part of our three-part series on MongoDB and Hadoop. In this post, we'll look at a Spark example.

  1. Introduction & Setup of Hadoop and MongoDB
  2. Hive Example
  3. Spark Example & Key Takeaways

For more detail on the use case, see the first paragraph of part 1.

Spark Example For using Spark, I opted to use Python from the interactive shell command “pyspark”. This gives you an interactive Python environment for leveraging Spark classes. I see Python used a lot among quants; it seems like a more natural language to use (vs Java or Scala) for interactive querying.

The benefits of Spark were immediately evident, and more in line with what you would expect in an interactive environment – queries return very quickly, much faster than Hive, due in part to the fact they are not compiled to MapReduce. While this latency is still an order of magnitude greater than what I have come to expect from MongoDB, including the aggregation framework, there are more options for analysis with Spark, so it clearly has a role to play for data analytics.

You can find the below script in spark-ohlcbars-example.py in my Github repo.

# set up parameters for reading from MongoDB via Hadoop input format
config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minbars"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
# these values worked but others might as well
keyClassName = "org.apache.hadoop.io.Text"
valueClassName = "org.apache.hadoop.io.MapWritable"

# read the 1-minute bars from MongoDB into Spark RDD format
minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)

# configuration for output to MongoDB
config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"

# takes the verbose raw structure (with extra metadata) and strips down to just the pricing data
minBarRDD = minBarRawRDD.values()

import calendar, time, math

dateFormatString = '%Y-%m-%d %H:%M'

# sort by time and then group into each bar in 5 minutes
groupedBars = minBarRDD.sortBy(lambda doc: str(doc["Timestamp"])).groupBy(lambda doc: 
    (doc["Symbol"], math.floor(calendar.timegm(time.strptime(doc["Timestamp"], dateFormatString)) / (5*60))))

# define function for looking at each group and pulling out OHLC
# assume each grouping is a tuple of (symbol, seconds since epoch) and a resultIterable of 1-minute OHLC records in the group

# write function to take a (tuple, group); iterate through group; and manually pull OHLC
def ohlc(grouping):
    low = sys.maxint
    high = -sys.maxint
    i = 0
    groupKey = grouping[0]
    group = grouping[1]
    for doc in group:
        #take time and open from first bar
        if i == 0:
            openTime = doc["Timestamp"]
            openPrice = doc["Open"]
        #assign min and max from the bar if appropriate
        if doc["Low"] < low:
            low = doc["Low"]
        if doc["High"] > high:
            high = doc["High"]
        i = i + 1            
        # take close of last bar
        if i == len(group):
            close = doc["Close"]
    outputDoc = {"Symbol": groupKey[0], 
        "Timestamp": openTime, 
        "Open": openPrice,
        "High": high,
        "Low": low,
        "Close": close}
    # tried returning [None, outputDoc] and seemed exception earlier
    return (None, outputDoc)


resultRDD = groupedBars.map(ohlc)


# This causes ClassCastException apparently because of an issue in Spark logged as SPARK-5361.  Should write to MongoDB but could not test.
# resultRDD.saveAsNewAPIHadoopFile("file:///placeholder", outputFormatClassName, None, None, None, None, config)

I saw the appeal of Spark from my first introduction. It was pretty easy to use. It is also especially nice in that it has operations that run on all elements in a list or a matrix of data. I can also see the appeal of having statistical capabilities like R, but in which the data can be distributed across many nodes easily (there is a Spark project for R as well).

The downside is that it certainly is new and I seemed to run into a non-trival bug (SPARK-5361 now fixed in 1.2.2+) that prevented me from writing from pyspark to a Hadoop file (writing to Hadoop & MongoDB in Java & Scala should work). Also I found it hard to visualize the data as I was manipulating it. It reminded me of my college days being frustrated debugging matrices representing ray traces in Matlab, before they added better tooling. For printing the data in the RDD structures, there is a function collect() for creating lists that are more easily printable but some elements were still not printable, e.g. Iterables.

Key Takeaways of Hive & Spark Exercise

Easy to integrate MongoDB Overall it was useful to see how data in MongoDB can be accessed via Hive and Spark. In retrospect, I spent more time manipulating the data in the two toolsets than I did integrating them with MongoDB, which is what I had hoped. I also started with a pre-configured VM instead of setting up the environment, so likely the infrastructure setup would take considerably longer.

Significant learning curve for Hadoop The Hadoop tutorials showed me there is a significant learning curve for learning all the projects that make up the Hadoop ecosystem. The Cloudera tutorials alone involved Sqoop, Avro, Hive, Spark, and Flume, all with their own learning curves. Furthermore, Hive and Spark are very different from one another. I see a lot of opportunity for front-ends to be built to hide this complexity, and for managing all the data formats that could end up in your “data lake.”

Real-life applicability A more real-life scenario for this kind of data manipulation is storing and querying real-time, intraday market data in MongoDB. Prices update throughout the current day, allowing users to querying them in real-time. Using Hive or Spark, after end of day (even if the next day begins immediately like in FX), individual ticks can be aggregated into structures that are more efficient to access, such as these OHLC bars, or large documents with arrays of individual ticks for the day by ticker symbol. This approach gives great write throughput during the day for capture, as well as blazing fast access to weeks, month, or years of prices. There are users of MongoDB whose systems follow this approach, and who have dramatically reduced latency for analytics, as well as reduced their hardware footprint. By storing the aggregated data back in MongoDB, you can index the data flexibly and retrieve it quickly.

When to use MongoDB & Hadoop MongoDB gives you an easy programming language API, aggregation framework for grouping and manipulating data, rich querying, and easy scaling across many servers. There are many data science users that use the Java, Python, or other drivers supported by MongoDB, or the community drivers for R and Matlab to pull data into their application for analysis. You could even run your analytics against a secondary node (with priority=0 to not be primary) without impacting the performance of your primary.

However, in some cases your data is larger than you can process in one application instance or one server. One approach is to iteratively transfer new data to a Hadoop cluster to benefit from highly parallelized analysis. You could either use the MongoDB Hadoop Connector to pull it into Hadoop or an ETL tool to push it there. Similarly, you might want to enrich or combine data in MongoDB with data from other sources. To do this, you could use the Hadoop Connector to access the data in a unified way across the data, independent of its source.

Probably more importantly is that, once you analyze data in Hadoop, the work of reporting and operationalizing the results often need to be done. The MongoDB Hadoop Connector makes it easy to process results and put them into MongoDB, for blazing fast reporting and querying with all the benefits of an operational database. It doesn’t just cache the data but provides full indexing support which I does not exist in Hadoop. MongoDB excels at providing sub-second response times on any subset of data, which users demand for reporting and operational uses in their work.

Overall, the benefit of the MongoDB Hadoop Connector, is combining the benefits of highly parallel analysis in Hadoop with low latency, rich querying for operational purposes from MongoDB and allowing technology teams to focus on data analysis rather than integration.


Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action.

Read more about Apache Spark and MongoDB


<< Read Part 2

About Matt Kalan

Matt Kalan is a Sr. Solution Architect at MongoDB, with extensive experience helping more than 300 customers in financial services and other industries solve business problems with technology. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions to FS firms. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.

comments powered by Disqus