Matt Kalan

6 results

Using MongoDB with Hadoop & Spark: Part 3 - Spark Example & Key Takeaways

**Update: August 4th 2016** Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark . Welcome to the final part of our three-part series on MongoDB and Hadoop. In this post, we'll look at a Spark example. Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways For more detail on the use case, see the first paragraph of part 1 . Spark Example For using Spark, I opted to use Python from the interactive shell command “pyspark”. This gives you an interactive Python environment for leveraging Spark classes. I see Python used a lot among quants; it seems like a more natural language to use (vs Java or Scala) for interactive querying. The benefits of Spark were immediately evident, and more in line with what you would expect in an interactive environment – queries return very quickly, much faster than Hive, due in part to the fact they are not compiled to MapReduce. While this latency is still an order of magnitude greater than what I have come to expect from MongoDB, including the aggregation framework, there are more options for analysis with Spark, so it clearly has a role to play for data analytics. You can find the below script in spark-ohlcbars-example.py in my Github repo . # set up parameters for reading from MongoDB via Hadoop input format config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minbars"} inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat" # these values worked but others might as well keyClassName = "org.apache.hadoop.io.Text" valueClassName = "org.apache.hadoop.io.MapWritable" # read the 1-minute bars from MongoDB into Spark RDD format minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config) # configuration for output to MongoDB config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars" outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat" # takes the verbose raw structure (with extra metadata) and strips down to just the pricing data minBarRDD = minBarRawRDD.values() import calendar, time, math dateFormatString = '%Y-%m-%d %H:%M' # sort by time and then group into each bar in 5 minutes groupedBars = minBarRDD.sortBy(lambda doc: str(doc["Timestamp"])).groupBy(lambda doc: (doc["Symbol"], math.floor(calendar.timegm(time.strptime(doc["Timestamp"], dateFormatString)) / (5*60)))) # define function for looking at each group and pulling out OHLC # assume each grouping is a tuple of (symbol, seconds since epoch) and a resultIterable of 1-minute OHLC records in the group # write function to take a (tuple, group); iterate through group; and manually pull OHLC def ohlc(grouping): low = sys.maxint high = -sys.maxint i = 0 groupKey = grouping[0] group = grouping[1] for doc in group: #take time and open from first bar if i == 0: openTime = doc["Timestamp"] openPrice = doc["Open"] #assign min and max from the bar if appropriate if doc["Low"] < low: low = doc["Low"] if doc["High"] > high: high = doc["High"] i = i + 1 # take close of last bar if i == len(group): close = doc["Close"] outputDoc = {"Symbol": groupKey[0], "Timestamp": openTime, "Open": openPrice, "High": high, "Low": low, "Close": close} # tried returning [None, outputDoc] and seemed exception earlier return (None, outputDoc) resultRDD = groupedBars.map(ohlc) # This causes ClassCastException apparently because of an issue in Spark logged as SPARK-5361. Should write to MongoDB but could not test. # resultRDD.saveAsNewAPIHadoopFile("file:///placeholder", outputFormatClassName, None, None, None, None, config) I saw the appeal of Spark from my first introduction. It was pretty easy to use. It is also especially nice in that it has operations that run on all elements in a list or a matrix of data. I can also see the appeal of having statistical capabilities like R, but in which the data can be distributed across many nodes easily (there is a Spark project for R as well). The downside is that it certainly is new and I seemed to run into a non-trival bug ( SPARK-5361 now fixed in 1.2.2+) that prevented me from writing from pyspark to a Hadoop file (writing to Hadoop & MongoDB in Java & Scala should work). Also I found it hard to visualize the data as I was manipulating it. It reminded me of my college days being frustrated debugging matrices representing ray traces in Matlab, before they added better tooling. For printing the data in the RDD structures, there is a function collect() for creating lists that are more easily printable but some elements were still not printable, e.g. Iterables. Key Takeaways of Hive & Spark Exercise Easy to integrate MongoDB Overall it was useful to see how data in MongoDB can be accessed via Hive and Spark. In retrospect, I spent more time manipulating the data in the two toolsets than I did integrating them with MongoDB, which is what I had hoped. I also started with a pre-configured VM instead of setting up the environment, so likely the infrastructure setup would take considerably longer. Significant learning curve for Hadoop The Hadoop tutorials showed me there is a significant learning curve for learning all the projects that make up the Hadoop ecosystem. The Cloudera tutorials alone involved Sqoop, Avro, Hive, Spark, and Flume, all with their own learning curves. Furthermore, Hive and Spark are very different from one another. I see a lot of opportunity for front-ends to be built to hide this complexity, and for managing all the data formats that could end up in your “data lake.” Real-life applicability A more real-life scenario for this kind of data manipulation is storing and querying real-time, intraday market data in MongoDB. Prices update throughout the current day, allowing users to querying them in real-time. Using Hive or Spark, after end of day (even if the next day begins immediately like in FX), individual ticks can be aggregated into structures that are more efficient to access, such as these OHLC bars, or large documents with arrays of individual ticks for the day by ticker symbol. This approach gives great write throughput during the day for capture, as well as blazing fast access to weeks, month, or years of prices. There are users of MongoDB whose systems follow this approach, and who have dramatically reduced latency for analytics, as well as reduced their hardware footprint. By storing the aggregated data back in MongoDB, you can index the data flexibly and retrieve it quickly. When to use MongoDB & Hadoop MongoDB gives you an easy programming language API, aggregation framework for grouping and manipulating data, rich querying, and easy scaling across many servers. There are many data science users that use the Java, Python, or other drivers supported by MongoDB, or the community drivers for R and Matlab to pull data into their application for analysis. You could even run your analytics against a secondary node (with priority=0 to not be primary) without impacting the performance of your primary. However, in some cases your data is larger than you can process in one application instance or one server. One approach is to iteratively transfer new data to a Hadoop cluster to benefit from highly parallelized analysis. You could either use the MongoDB Hadoop Connector to pull it into Hadoop or an ETL tool to push it there. Similarly, you might want to enrich or combine data in MongoDB with data from other sources. To do this, you could use the Hadoop Connector to access the data in a unified way across the data, independent of its source. Probably more importantly is that, once you analyze data in Hadoop, the work of reporting and operationalizing the results often need to be done. The MongoDB Hadoop Connector makes it easy to process results and put them into MongoDB, for blazing fast reporting and querying with all the benefits of an operational database. It doesn’t just cache the data but provides full indexing support which I does not exist in Hadoop. MongoDB excels at providing sub-second response times on any subset of data, which users demand for reporting and operational uses in their work. Overall, the benefit of the MongoDB Hadoop Connector, is combining the benefits of highly parallel analysis in Hadoop with low latency, rich querying for operational purposes from MongoDB and allowing technology teams to focus on data analysis rather than integration. Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action. Read more about Apache Spark and MongoDB < Read Part 2

February 19, 2015

Using MongoDB with Hadoop & Spark: Part 2 - Hive Example

**Update: August 4th 2016** Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark . Welcome to part two of our three-part series on MongoDB and Hadoop. In part one, we introduced Hadoop and how to set it up. In this post, we'll look at a Hive example. Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways For more detail on the use case, see the first paragraph of part 1 . Summary Use case: aggregating 1 minute intervals of stock prices into 5 minute intervals Input: 1 minute stock prices intervals in a MongoDB database Simple Analysis: performed in: Hive Spark Output: 5 minute stock prices intervals in Hadoop Hive Example I ran the following example from the Hive command line (simply typing the command “hive” with no parameters), not Cloudera’s Hue editor, as that would have needed additional installation steps. I immediately noticed the criticism people have with Hive, that everything is compiled into MapReduce which takes considerable time. I ran most things with just 20 records to make the queries run quickly. This creates the definition of the table in Hive that matches the structure of the data in MongoDB. MongoDB has a dynamic schema for variable data shapes but Hive and SQL need a schema definition. You can also find the below Hive SQL statements in hive-ohlcbars-example.sql in my Github repo . CREATE EXTERNAL TABLE minute_bars ( id STRUCT , Symbol STRING, Timestamp STRING, Day INT, Open DOUBLE, High DOUBLE, Low DOUBLE, Close DOUBLE, Volume INT ) STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id", "Symbol":"Symbol", "Timestamp":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}') TBLPROPERTIES('mongo.uri'='mongodb://localhost:27017/marketdata.minbars'); Recent changes in the Apache Hive repo make the mappings necessary even if you are keeping the field names the same. This should be changed in the MongoDB Hadoop Connector soon if not already by the time you read this. Then I ran the following command to create a Hive table for the 5 minute bars: CREATE TABLE five_minute_bars ( id STRUCT , Symbol STRING, Timestamp STRING, Open DOUBLE, High DOUBLE, Low DOUBLE, Close DOUBLE ); This insert statement uses the SQL windowing functions to group 5 1-minute periods and determine the OHLC for the 5 minutes. There are definitely other ways to do this but here is one I figured out. Grouping in SQL is a little different from grouping in the MongoDB aggregation framework (in which you can pull the first and last of a group easily), so it took me a little while to remember how to do it with a subquery. The subquery takes each group of 5 1-minute records/documents, sorts them by time, and takes the open, high, low, and close price up to that record in each 5-minute period. Then the outside WHERE clause selects the last 1-minute bar in that period (because that row in the subquery has the correct OHLC information for its 5-minute period). I definitely welcome easier queries to understand but you can run the subquery by itself to see what it’s doing too. INSERT INTO TABLE five_minute_bars SELECT m.id, m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close FROM (SELECT id, Symbol, FIRST_VALUE(Timestamp) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as OpenTime, LAST_VALUE(Timestamp) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as CloseTime, FIRST_VALUE(Open) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as Open, MAX(High) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as High, MIN(Low) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as Low, LAST_VALUE(Close) OVER ( PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) ORDER BY Timestamp) as Close FROM minute_bars) as m WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4; I can definitely see the benefit of being able to use SQL to access data in MongoDB and optionally in other databases and file formats, all with the same commands, while the mapping differences are handled in the table declarations. The downside is that the latency is quite high, but that could be made up some with the ability to scale horizontally across many nodes. I think this is the appeal of Hive for most people - they can scale to very large data volumes using traditional SQL, and latency is not a primary concern. Post #3 in this blog series shows similar examples using Spark. Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action. Read more about Apache Spark and MongoDB < Read Part 1 > About Matt Kalan Matt Kalan is a Sr. Solution Architect at MongoDB, with extensive experience helping more than 300 customers in financial services and other industries solve business problems with technology. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions to FS firms. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.

February 17, 2015

Using MongoDB with Hadoop & Spark: Part 1 - Introduction & Setup

**Update: August 4th 2016** Since this original post, MongoDB has released a new certified connector for Spark. Click through for a tutorial on using the new MongoDB Connector for Apache Spark . Hadoop is a software technology designed for storing and processing large volumes of data distributed across a cluster of commodity servers and commodity storage. Hadoop was initially inspired by papers published by Google outlining its approach to handling large volumes of data as it indexed the Web. Many organizations are now harnessing the power of Hadoop and MongoDB together to create complete big data applications: MongoDB powers the online, real time operational application, while Hadoop consumes data from MongoDB and blends its with data from other operational systems to fuel sophisticated analytics and machine learning. This is part one of a three-part series on MongoDB and Hadoop: Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways Introduction & Setup of Hadoop and MongoDB There are many, many data management technologies available today, and that makes it hard to discern hype from reality. Working at MongoDB Inc., I know the immense value of MongoDB as a great real-time operational database for applications; however for analytics and batch operations, I wanted to understand more clearly the options available and when to use some of the other great options like Spark. I started with a simple example of taking 1 minute time series intervals of stock prices with the opening (first) price, high (max), low (min), and closing (last) price of each time interval and turning them into 5 minute intervals (called OHLC bars). The 1-minute data is stored in MongoDB and is then processed in Hive or Spark via the MongoDB Hadoop Connector, which allows MongoDB to be an input or output to/from Hadoop and Spark. One might imagine a more typical example is that you record this market data in MongoDB for real-time purposes but then potentially run offline analytical models in another environment. Of course the models would be way more complicated – this is just as a Hello World level example. I chose OHLC bars just because that was the data I found easily. Summary Use case: aggregating 1 minute intervals of stock prices into 5 minute intervals Input: 1 minute stock prices intervals in a MongoDB database Simple Analysis: performed in: Hive Spark Output: 5 minute stock prices intervals in Hadoop Steps to Set Up the Environment Set up Hadoop environment – Hadoop is fairly involved to set up but fortunately Cloudera makes VMs available with their distribution already installed, including both Hive and Spark. I downloaded Virtualbox (open source VM manager) onto my Mac laptop to run the Cloudera VMs from here . Go through tutorials - I went through the tutorials included in the VM which were pretty helpful; sometimes I felt like I was just cutting and pasting and not knowing what was happening though, especially with Spark. One thing that is obvious from the tutorials is that the learning curve for using “Hadoop” includes learning many products in the ecosystem (Sqoop, Avro, Hive, Flume, Spark, etc.). If I were only doing this simple thing, there is no way I would use Hadoop for it with that learning curve but of course some problems justify the effort. Download sample data – I Googled for some sample pricing data and found these 1 minute bars from this site Install MongoDB on the VM – it is really easy with yum on CentOS from this page Start MongoDB – a default configuration file is installed by yum so you can just run this to start on localhost and the default port 27017 mongod -f /etc/mongod.conf Load sample data – mongoimport allows you to load CSV files directly as a flat document in MongoDB. The command is simply this: mongoimport equities-msft-minute-bars-2009.csv --type csv --headerline -d marketdata -c minibars Install MongoDB Hadoop Connector – I ran through the steps at the link below to build for Cloudera 5 (CDH5). One note is that by default my “git clone” put the mongo-hadoop files in /etc/profile.d but your repository might be set up differently. Also one addition to the install steps is to set the path to mongoimport in build.gradle based on where you installed MongoDB. I used yum and the path to mongo tools was /usr/bin/ . Install steps are here For the following examples, here is what a document looks like in the MongoDB collection (via the Mongo shell). You start the Mongo shell simply with the command “mongo” from the /bin directory of the MongoDB installation. > use marketdata > db.minbars.findOne() { "_id" : ObjectId("54c00d1816526bc59d84b97c"), "Symbol" : "MSFT", "Timestamp" : "2009-08-24 09:30", "Day" : 24, "Open" : 24.41, "High" : 24.42, "Low" : 24.31, "Close" : 24.31, "Volume" : 683713 } Posts #2 and #3 in this blog series show examples of Hive and Spark using this setup above. Introduction & Setup of Hadoop and MongoDB Hive Example Spark Example & Key Takeaways Want to learn more about Apache Spark and MongoDB? Read our white paper on turning analytics into real-time action. Read more about Apache Spark and MongoDB Read Part 2 >> About Matt Kalan Matt Kalan is a Sr. Solution Architect at MongoDB, with extensive experience helping more than 300 customers in financial services and other industries solve business problems with technology. Before MongoDB, Matt grew Progress Software’s Apama Algorithmic Trading and Complex Event Processing (CEP) Platform business in North America and later sold broader operational intelligence solutions to FS firms. He previously worked for Caplin Systems selling solutions to stream real-time market data over the web to FX and FI portals, and for Sapient providing consulting services to global 2000 clients.

February 17, 2015

The DBA's future as the Database Adviser

With such a large shift towards NoSQL technologies like MongoDB, there is a lot of discussion about the changing role of the Database Administrator (DBA). Many go so far as to say DBAs are no longer needed , an idea driven by new database capabilities that focus on agility with a dynamic schema instead of a fixed schema and features that make operational management easier in areas like high availability and horizontal scaling. Based on my experience working with hundreds of customers to implement MongoDB in their organizations, there is still room for the DBA, even if their everyday tasks might take less time with MongoDB. First, let's talk about the parts of their role that stay the same. There still needs to be someone to set up back-up/recovery processes, handle capacity planning, run maintenance tasks (e.g. upgrades), diagnose issues, do configuration management, and set up replication and sharding. In enterprises, often separate operational teams handle security, monitoring, and diagnosing common issues but that could be part of the DBA's role in some firms as well. Notice I left out schema management; in MongoDB, the implementation of the schema is NOT predefined but is rather determined at the application level and the structure of the object is stored in the application team’s favorite programming language. This is incredibly valuable when business units request new features or data be added to the application. The application developer can simply add it in, and no change needs to be made in the database, enabling rapid agility. In this case, the DBA is no longer the middleman, and can focus on keeping the database up and running. But in some cases, application changes are more complex, and development teams need a database expert to consult about schema design and its impact on performance, maintainability, and other factors. In this case, the role of the DBA transforms into the DB Adviser. In this role, the DB Advisor would work closely with the application development team that implements the schema. The DBA provides the process and due diligence to manage the pace of change rather than enforce the limitations of the relational schema. Implementing the schema in the application might be disconcerting to some DBAs and others out there who rightfully worry about application developers making uninformed decisions and bringing down an application. Letting go of schema management might be a difficult step, but DBAs rely on application developers to make decisions in their application code on a regular basis. Creating a poorly constructed algorithm will bring the system to a crawl and querying fields in unexpected ways would cause performance problems as well. Is choosing schema design in the database so much more responsibility than any other development decision? The DBA should still guide developers, but without the enormous overhead of having to always update a relationship database schema to simply add a field! Your schema should be as dynamic as your business: agile with the optimal amount of control. The DBA has always been the person accountable to ensure the reliability and performance of the database alongside development teams. However, in today's market, the DBA will continue performing most of the activities they do in the former relational-only world, but will advise in other areas to allow their business groups to innovate and iterate faster than their competition. In this shift, the DBAs can take on new roles to help their businesses achieve more, faster. See how you can help your organizations get faster, better and leaner with MongoDB .

May 14, 2014