GIANT Stories at MongoDB

The Future of Big Data Architecture

Matt Kalan

Business

The big data problem It is probably apparent to everyone reading that data is growing at enormous rates. There is extremely valuable insight that can be found in this data if harnessed effectively, and traditional technologies, many initially designed 40 years ago like RDBMSs, are not sufficient for creating the business value promised by the “Big Data” hype. A common example in using Big Data technology is for “Single View of the Customer” – aggregating everything you know about a customer in order to optimize your engagement and revenue with them, e.g. determining exactly what promotions to send them via which channel and when.

Tutorial for Operationalizing Spark with MongoDB

Matt Kalan

Technical

If you are familiar with Hadoop, you know that Hive, Pig, Map-reduce, and more recently Spark can use HDFS as inputs and outputs for your Hadoop jobs. What you might not know is that you can just as easily use MongoDB as an input or output for Hadoop jobs. I showed this last time with the MongoDB Connector for Hadoop using Spark’s RDD API, but this time I’m going to show it using the open source spark-mongodb package from Stratio, which takes the approach of using Spark’s DataFrames, and provides several advantages we will explore.

Using MongoDB with Hadoop & Spark: Part 3 - Spark Example & Key Takeaways

Matt Kalan

Business

**Update: August 4th 2016** Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark.

Welcome to the final part of our three-part series on MongoDB and Hadoop. In this post, we'll look at a Spark example.

  1. Introduction & Setup of Hadoop and MongoDB
  2. Hive Example
  3. Spark Example & Key Takeaways

For more detail on the use case, see the first paragraph of part 1.

Spark Example For using Spark, I opted to use Python from the interactive shell command “pyspark”. This gives you an interactive Python environment for leveraging Spark classes. I see Python used a lot among quants; it seems like a more natural language to use (vs Java or Scala) for interactive querying.

The benefits of Spark were immediately evident, and more in line with what you would expect in an interactive environment – queries return very quickly, much faster than Hive, due in part to the fact they are not compiled to MapReduce. While this latency is still an order of magnitude greater than what I have come to expect from MongoDB, including the aggregation framework, there are more options for analysis with Spark, so it clearly has a role to play for data analytics.

You can find the below script in spark-ohlcbars-example.py in my Github repo.


# set up parameters for reading from MongoDB via Hadoop input format
config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minbars"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
# these values worked but others might as well
keyClassName = "org.apache.hadoop.io.Text"
valueClassName = "org.apache.hadoop.io.MapWritable"
 
# read the 1-minute bars from MongoDB into Spark RDD format
minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)

# configuration for output to MongoDB
config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"
 
# takes the verbose raw structure (with extra metadata) and strips down to just the pricing data
minBarRDD = minBarRawRDD.values()
 
import calendar, time, math
 
dateFormatString = '%Y-%m-%d %H:%M'
 
# sort by time and then group into each bar in 5 minutes
groupedBars = minBarRDD.sortBy(lambda doc: str(doc["Timestamp"])).groupBy(lambda doc: 
    (doc["Symbol"], math.floor(calendar.timegm(time.strptime(doc["Timestamp"], dateFormatString)) / (5*60))))
 
# define function for looking at each group and pulling out OHLC
# assume each grouping is a tuple of (symbol, seconds since epoch) and a resultIterable of 1-minute OHLC records in the group
 
# write function to take a (tuple, group); iterate through group; and manually pull OHLC
def ohlc(grouping):
    low = sys.maxint
    high = -sys.maxint
    i = 0
    groupKey = grouping[0]
    group = grouping[1]
    for doc in group:
        #take time and open from first bar
        if i == 0:
            openTime = doc["Timestamp"]
            openPrice = doc["Open"]
        #assign min and max from the bar if appropriate
        if doc["Low"] < low:
            low = doc["Low"]
        if doc["High"] > high:
            high = doc["High"]
        i = i + 1            
        # take close of last bar
        if i == len(group):
            close = doc["Close"]
    outputDoc = {"Symbol": groupKey[0], 
        "Timestamp": openTime, 
        "Open": openPrice,
        "High": high,
        "Low": low,
        "Close": close}
    # tried returning [None, outputDoc] and seemed exception earlier
    return (None, outputDoc)


resultRDD = groupedBars.map(ohlc)

 
# This causes ClassCastException apparently because of an issue in Spark logged as SPARK-5361.  Should write to MongoDB but could not test.
# resultRDD.saveAsNewAPIHadoopFile("file:///placeholder", outputFormatClassName, None, None, None, None, config)

I saw the appeal of Spark from my first introduction. It was pretty easy to use. It is also especially nice in that it has operations that run on all elements in a list or a matrix of data. I can also see the appeal of having statistical capabilities like R, but in which the data can be distributed across many nodes easily (there is a Spark project for R as well).

The downside is that it certainly is new and I seemed to run into a non-trival bug (SPARK-5361 now fixed in 1.2.2+) that prevented me from writing from pyspark to a Hadoop file (writing to Hadoop & MongoDB in Java & Scala should work). Also I found it hard to visualize the data as I was manipulating it. It reminded me of my college days being frustrated debugging matrices representing ray traces in Matlab, before they added better tooling. For printing the data in the RDD structures, there is a function collect() for creating lists that are more easily printable but some elements were still not printable, e.g. Iterables.

Key Takeaways of Hive & Spark Exercise

Easy to integrate MongoDB Overall it was useful to see how data in MongoDB can be accessed via Hive and Spark. In retrospect, I spent more time manipulating the data in the two toolsets than I did integrating them with MongoDB, which is what I had hoped. I also started with a pre-configured VM instead of setting up the environment, so likely the infrastructure setup would take considerably longer.

Significant learning curve for Hadoop The Hadoop tutorials showed me there is a significant learning curve for learning all the projects that make up the Hadoop ecosystem. The Cloudera tutorials alone involved Sqoop, Avro, Hive, Spark, and Flume, all with their own learning curves. Furthermore, Hive and Spark are very different from one another. I see a lot of opportunity for front-ends to be built to hide this complexity, and for managing all the data formats that could end up in your “data lake.”

Real-life applicability A more real-life scenario for this kind of data manipulation is storing and querying real-time, intraday market data in MongoDB. Prices update throughout the current day, allowing users to querying them in real-time. Using Hive or Spark, after end of day (even if the next day begins immediately like in FX), individual ticks can be aggregated into structures that are more efficient to access, such as these OHLC bars, or large documents with arrays of individual ticks for the day by ticker symbol. This approach gives great write throughput during the day for capture, as well as blazing fast access to weeks, month, or years of prices. There are users of MongoDB whose systems follow this approach, and who have dramatically reduced latency for analytics, as well as reduced their hardware footprint. By storing the aggregated data back in MongoDB, you can index the data flexibly and retrieve it quickly.

When to use MongoDB & Hadoop MongoDB gives you an easy programming language API, aggregation framework for grouping and manipulating data, rich querying, and easy scaling across many servers. There are many data science users that use the Java, Python, or other drivers supported by MongoDB, or the community drivers for R and Matlab to pull data into their application for analysis. You could even run your analytics against a secondary node (with priority=0 to not be primary) without impacting the performance of your primary.

However, in some cases your data is larger than you can process in one application instance or one server. One approach is to iteratively transfer new data to a Hadoop cluster to benefit from highly parallelized analysis. You could either use the MongoDB Hadoop Connector to pull it into Hadoop or an ETL tool to push it there. Similarly, you might want to enrich or combine data in MongoDB with data from other sources. To do this, you could use the Hadoop Connector to access the data in a unified way across the data, independent of its source.

Probably more importantly is that, once you analyze data in Hadoop, the work of reporting and operationalizing the results often need to be done. The MongoDB Hadoop Connector makes it easy to process results and put them into MongoDB, for blazing fast reporting and querying with all the benefits of an operational database. It doesn’t just cache the data but provides full indexing support which I does not exist in Hadoop. MongoDB excels at providing sub-second response times on any subset of data, which users demand for reporting and operational uses in their work.

Overall, the benefit of the MongoDB Hadoop Connector, is combining the benefits of highly parallel analysis in Hadoop with low latency, rich querying for operational purposes from MongoDB and allowing technology teams to focus on data analysis rather than integration.


Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action.
Read more about Apache Spark and MongoDB

<< Read Part 2

About Matt Kalan

Matt Kalan is a Sr. Solution Architect at MongoDB, with extensive experience helping more than 300 customers in financial services and other industries solve business problems with technology. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions to FS firms. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.

Using MongoDB with Hadoop & Spark: Part 2 - Hive Example

Matt Kalan

Technical

**Update: August 4th 2016** Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark.

Welcome to part two of our three-part series on MongoDB and Hadoop. In part one, we introduced Hadoop and how to set it up. In this post, we'll look at a Hive example.

  1. Introduction & Setup of Hadoop and MongoDB
  2. Hive Example
  3. Spark Example & Key Takeaways

For more detail on the use case, see the first paragraph of part 1.

Summary

Use case: aggregating 1 minute intervals of stock prices into 5 minute intervals
Input: 1 minute stock prices intervals in a MongoDB database
Simple Analysis: performed in:

  • Hive
  • Spark

Output: 5 minute stock prices intervals in Hadoop

Hive Example

I ran the following example from the Hive command line (simply typing the command “hive” with no parameters), not Cloudera’s Hue editor, as that would have needed additional installation steps. I immediately noticed the criticism people have with Hive, that everything is compiled into MapReduce which takes considerable time. I ran most things with just 20 records to make the queries run quickly.

This creates the definition of the table in Hive that matches the structure of the data in MongoDB. MongoDB has a dynamic schema for variable data shapes but Hive and SQL need a schema definition.

You can also find the below Hive SQL statements in hive-ohlcbars-example.sql in my Github repo.


CREATE EXTERNAL TABLE minute_bars
(
      
id STRUCT<oid:STRING, bsontype:INT>,
      Symbol STRING,
      Timestamp STRING,
      Day INT,
      Open DOUBLE,
      High DOUBLE,
      Low DOUBLE,
      Close DOUBLE,
      Volume INT
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id",
 "Symbol":"Symbol", "Timestamp":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}')
TBLPROPERTIES('mongo.uri'='mongodb://localhost:27017/marketdata.minbars');

Recent changes in the Apache Hive repo make the mappings necessary even if you are keeping the field names the same. This should be changed in the MongoDB Hadoop Connector soon if not already by the time you read this.

Then I ran the following command to create a Hive table for the 5 minute bars:


CREATE TABLE five_minute_bars
(
 
   id STRUCT<oid:STRING, bsontype:INT>,
    Symbol STRING,
    Timestamp STRING,
    Open DOUBLE,
     High DOUBLE,
     Low DOUBLE,
    Close DOUBLE
);

This insert statement uses the SQL windowing functions to group 5 1-minute periods and determine the OHLC for the 5 minutes. There are definitely other ways to do this but here is one I figured out. Grouping in SQL is a little different from grouping in the MongoDB aggregation framework (in which you can pull the first and last of a group easily), so it took me a little while to remember how to do it with a subquery.

The subquery takes each group of 5 1-minute records/documents, sorts them by time, and takes the open, high, low, and close price up to that record in each 5-minute period. Then the outside WHERE clause selects the last 1-minute bar in that period (because that row in the subquery has the correct OHLC information for its 5-minute period). I definitely welcome easier queries to understand but you can run the subquery by itself to see what it’s doing too.


INSERT INTO TABLE five_minute_bars
SELECT m.id, m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close
FROM
(SELECT
 
     id,
      Symbol,
      FIRST_VALUE(Timestamp)
      OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
             ORDER BY Timestamp)
 
     as OpenTime,
 
     LAST_VALUE(Timestamp)
      OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
             ORDER BY Timestamp)
 
     as CloseTime,
 
     FIRST_VALUE(Open)
      OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
             ORDER BY Timestamp)
 
     as Open,
       MAX(High)
      OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
             ORDER BY Timestamp)
 
     as High,
 
     MIN(Low)
      OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
             ORDER BY Timestamp)
 
     as Low,
      LAST_VALUE(Close)
      OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
             ORDER BY Timestamp)
 
     as Close
FROM minute_bars)
as m
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4;

I can definitely see the benefit of being able to use SQL to access data in MongoDB and optionally in other databases and file formats, all with the same commands, while the mapping differences are handled in the table declarations. The downside is that the latency is quite high, but that could be made up some with the ability to scale horizontally across many nodes. I think this is the appeal of Hive for most people - they can scale to very large data volumes using traditional SQL, and latency is not a primary concern.

Post #3 in this blog series shows similar examples using Spark.

  1. Introduction & Setup of Hadoop and MongoDB
  2. Hive Example
  3. Spark Example & Key Takeaways

Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action.
Read more about Apache Spark and MongoDB

<< Read Part 1

Read Part 3 >>



About Matt Kalan

Matt Kalan is a Sr. Solution Architect at MongoDB, with extensive experience helping more than 300 customers in financial services and other industries solve business problems with technology. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions to FS firms. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.

Using MongoDB with Hadoop & Spark: Part 1 - Introduction & Setup

Matt Kalan

Technical

**Update: August 4th 2016** Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark.

Hadoop is a software technology designed for storing and processing large volumes of data distributed across a cluster of commodity servers and commodity storage. Hadoop was initially inspired by papers published by Google outlining its approach to handling large volumes of data as it indexed the Web. Many organizations are now harnessing the power of Hadoop and MongoDB together to create complete big data applications: MongoDB powers the online, real time operational application, while Hadoop consumes data from MongoDB and blends its with data from other operational systems to fuel sophisticated analytics and machine learning.

This is part one of a three-part series on MongoDB and Hadoop:

  1. Introduction & Setup of Hadoop and MongoDB
  2. Hive Example
  3. Spark Example & Key Takeaways

Introduction & Setup of Hadoop and MongoDB

There are many, many data management technologies available today, and that makes it hard to discern hype from reality. Working at MongoDB Inc., I know the immense value of MongoDB as a great real-time operational database for applications; however for analytics and batch operations, I wanted to understand more clearly the options available and when to use some of the other great options like Spark.

I started with a simple example of taking 1 minute time series intervals of stock prices with the opening (first) price, high (max), low (min), and closing (last) price of each time interval and turning them into 5 minute intervals (called OHLC bars). The 1-minute data is stored in MongoDB and is then processed in Hive or Spark via the MongoDB Hadoop Connector, which allows MongoDB to be an input or output to/from Hadoop and Spark.

One might imagine a more typical example is that you record this market data in MongoDB for real-time purposes but then potentially run offline analytical models in another environment. Of course the models would be way more complicated – this is just as a Hello World level example. I chose OHLC bars just because that was the data I found easily.

Summary

Use case: aggregating 1 minute intervals of stock prices into 5 minute intervals
Input: 1 minute stock prices intervals in a MongoDB database
Simple Analysis: performed in:

  • Hive
  • Spark

Output: 5 minute stock prices intervals in Hadoop

Steps to Set Up the Environment

  • Set up Hadoop environment – Hadoop is fairly involved to set up but fortunately Cloudera makes VMs available with their distribution already installed, including both Hive and Spark. I downloaded Virtualbox (open source VM manager) onto my Mac laptop to run the Cloudera VMs from here.
  • Go through tutorials - I went through the tutorials included in the VM which were pretty helpful; sometimes I felt like I was just cutting and pasting and not knowing what was happening though, especially with Spark. One thing that is obvious from the tutorials is that the learning curve for using “Hadoop” includes learning many products in the ecosystem (Sqoop, Avro, Hive, Flume, Spark, etc.). If I were only doing this simple thing, there is no way I would use Hadoop for it with that learning curve but of course some problems justify the effort.
  • Download sample data – I Googled for some sample pricing data and found these 1 minute bars from this site
  • Install MongoDB on the VM – it is really easy with yum on CentOS from this page
  • Start MongoDB – a default configuration file is installed by yum so you can just run this to start on localhost and the default port 27017
    
    mongod -f /etc/mongod.conf
    
  • Load sample data – mongoimport allows you to load CSV files directly as a flat document in MongoDB. The command is simply this:
    
    mongoimport equities-msft-minute-bars-2009.csv --type csv --headerline -d marketdata -c minibars
    
  • Install MongoDB Hadoop Connector – I ran through the steps at the link below to build for Cloudera 5 (CDH5). One note is that by default my “git clone” put the mongo-hadoop files in /etc/profile.d but your repository might be set up differently. Also one addition to the install steps is to set the path to mongoimport in build.gradle based on where you installed MongoDB. I used yum and the path to mongo tools was /usr/bin/. Install steps are here

For the following examples, here is what a document looks like in the MongoDB collection (via the Mongo shell). You start the Mongo shell simply with the command “mongo” from the /bin directory of the MongoDB installation.


> use marketdata 
> db.minbars.findOne()
{
    "_id" : ObjectId("54c00d1816526bc59d84b97c"),
     "Symbol" : "MSFT",
    "Timestamp" : "2009-08-24 09:30",
    "Day" : 24,
     "Open" : 24.41,
    "High" : 24.42,
    "Low" : 24.31,
     "Close" : 24.31,
    "Volume" : 683713
}

Posts #2 and #3 in this blog series show examples of Hive and Spark using this setup above.

  1. Introduction & Setup of Hadoop and MongoDB
  2. Hive Example
  3. Spark Example & Key Takeaways

Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action.
Read more about Apache Spark and MongoDB

Read Part 2 >>

About Matt Kalan

Matt Kalan is a Sr. Solution Architect at MongoDB, with extensive experience helping more than 300 customers in financial services and other industries solve business problems with technology. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions to FS firms. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.