Powering Social Insights with MongoDB at uberVu

MongoDB

#Releases

This is a guest post from the uberVU team.

Today, more than ever, marketers are being asked to show how their financial investments are driving tangible business results. We help them accomplish that goal. uberVU is a real-time social media marketing platform that allows organizations to leverage their social media data to better understand, connect with, and grow their online communities. We have an extensive client list including enterprise customers such as Heinz, NBC, World Bank, and Fujitsu.

We were recently acquired by HootSuite, and together our two products offer a complete and integrated feature set that addresses the entire social media lifecycle:

  • monitoring
  • metrics
  • reporting
  • engagement
  • collaboration

We have a five-year history in the social media monitoring market, and our evolving data storage architecture has played a key role in elevating our application’s value and user experience. For our data handling needs, we started with Tokyo Cabinet, SimpleDB, and MySQL and now use MongoDB, DynamoDB, S3, Glacier, and ElasticSearch.

Originally our team intended to use MongoDB as a secondary data store, but after a short implementation and adoption period of 3 months in which it quickly gained traction internally, MongoDB was promoted to our primary data store.

Challenges

We collect and store social media content such as tweets, Facebook posts, blog posts, blog comments, etc. Each item is stored in the database as a separate document.

A stored tweet might look something like this:

{
    generator: ‘twitter’,
    content: ‘This is a tweet example for ubervu’,
    author: ‘Vladimir Oane’,
    gender: ‘male’,
language: ‘english’,
    sentiment: ‘positive’,
    search: ‘ubervu’,
    published: 1391767879,
    ...
}

For our clients, relevant social media content must contain or match a predefined expression of interest in the designated ‘search’ field. In the example above, the tweet is collected and stored because it contains the string ‘ubervu’ in the content body.

Unique Index Structure

Our most common use case with MongoDB is performing a range query over a time frame for a fixed expression. For example, we might want to retrieve social media content that contains or matches the expression ‘ubervu’ between October 1st and November 23rd.

We constructed the unique index in MongoDB, _id, to perform this query automatically. For space considerations, we opted for a 64 bit integer and divided it into three parts:

  1. A hash of the search expression
  2. The entire published timestamp, in seconds
  3. An item id, which together with the timestamp should uniquely identify a document

To conduct a search for all “ubervu” mentions between timestamp1 and timestamp2, we simply run a range query on “_id” between:

and:

Note above how the lowest and highest bounds have the item id portion filled entirely with 0’s and 1’s, respectively. This allows us to cover edge cases of items that fall between timestamp1 and timestamp2.

Efficient Filtering

Another very common use case is retrieving all the data that matches a criteria set. Within our application, the fields we can filter on are predefined (generator, language, sentiment, language, gender).

Efficient filtering is a challenge because the most obvious approach - creating indexes for every combination of filters - is not scalable as every added index costs storage space and has the potential to adversely affect write performance.

To improve query efficiency, we added an ‘attributes’ field into each document that consists of an encoded array of all the field values that can be used in a query. It looks like this:

attributes: [2041, 15, 178, 23 …]

Each numeric code in the array represents a property, such as “sentiment: positive” or “language: English”. We added an index over the ‘attributes’ field to speed up queries.

To retrieve all items matching a criteria set using the ‘attributes’ field, queries are run using the $all operator:

collection.find({attributes:{$all:[...]}}

A shortcoming of using the $all operator is that prior to MongoDB 2.6 the index is only used to match the first code in the ‘attributes’ array; all other codes must be retrieved from disk and matched with the rest of query criteria.

In an effort to reduce the number of documents that need to be checked from disk for each query, we developed a system that first sorts all numeric codes by the frequency that they appear in the data store and then orders the elements in the ‘attributes’ fields according to their ranking.

For example, the property “generator: Twitter” (representing all tweets) is more prevalent in the data store than the property “language: Romanian” (representing all content in Romanian). If we wanted to obtain all tweets written in Romanian from the database, it would be more efficient to place the numeric code representing “language: Romanian” first in the ‘attributes’ array as it is faster to retrieve all Romanian content from disk and check if they are tweets than to retrieve all tweets and check if they are written in Romanian.

This solution described above dramatically improved our query response time. MongoDB’s dynamic schema and rich query model made this possible.

Saving Storage Space

After realizing the fields in our documents would be relatively small in number and mostly consistent across the database, our team decided to impose a two character limit on all field names (“generator” became, “g”, “sentiment” became “s”, etc).

This small change saved us 16% of our storage space, without any loss in information.

Our Infrastructure Setup

We have taken full advantage of the cloud computing resources available to efficiently deploy and scale our offerings. Our current infrastructure resides entirely on the AWS stack.

We currently have 30+ instances deployed and over 30 terabytes of storage in permanent use. All EBS-backed production data stores currently reside on xfs RAID arrays.

This storage architecture provides us with not only volume redundancy, but also performance boosts, which were much needed in the beginning when provisioned IOPS was not yet available to ensure EBS performance.

Our MongoDB setup consists of six production clusters, each with its own unique scope and usage pattern.

Our MongoDB clusters all have the same topology:

  • Each shard (shardA, shardB … shardZ) consists of three member replica sets
  • Three ‘config server’ processes are deployed on separate instances
  • ‘mongos’ routing processes are spread throughout the whole system (webnode, API nodes, worker nodes, etc …)

Optimizing Performance and Redundancy with MongoDB

For us, relying on EBS-backed MongoDB clusters meant we had to familiarize ourselves with the concept of the working set, a number which represents the amount of data that is regularly accessed during day-to-day operations. In situations when the working set is larger than RAM, our application would be forced to read from disk, resulting in an immediate loss in performance due to EBS I/O latency. Now working sets can be estimated using the working set estimator, which was first introduced in MongoDB v2.4.

To prevent the working set from exceeding RAM, we first viewed our data usage patterns in Graphite:

The graphic above represents the ‘write’ working set for one of the clusters.

The graphic above represents the ‘read’ working set generated by our API.

Using our data usage information, we defined the following access patterns:

RecentDB HistoricalDB
Read Working Set 0-20 days 20-90 days
Write Working Set 0-20 days 0-90 days ( > 20 day data can be written directly here)

The architecture represented by the table above has been in place for more than three years and has proven itself on multiple occasions from both a performance and redundancy standpoint.

We currently use MongoDB Management Service (MMS) in addition to tools such as Graphite and Collectd for both monitoring and backup. These applications have been critical to managing our cloud-based cluster backups.

As our MongoDB-powered data store grew in size, the decision was made to implement an ‘inverted pyramid’ mechanism in an effort to provide the best possible response time while remaining cost efficient.

This mechanism relies on two main data stores, RecentDB and HistoricalDB, with the use of an in-house oplog replay tool that keeps the two clusters in sync.

The oplog - short for operations log - is a special capped collection that keeps a rolling record of all operations that modify the data stored in a MongoDB database, and is the basic mechanism that enables replication in MongoDB. Secondary nodes tail the oplog for new operations and replay them locally.

To implement the ‘inverted pyramid’ mechanism, we developed a process that connects source cluster shards (RecentDB) to a destination cluster mongos router instance, verifies the last written timestamp, tails the source oplog to that timestamp, and finally, replays all executed operations.

After taking into consideration our current settings and data volumes, we determined that an oplog replay timeframe of 72 - 96 hrs worked best for our clusters as it ensured there was enough time to counter any major failures at the cluster level (e.g. full replica sets downtime, storage replacements, etc).

In the current implementation, all 5 oplog processes (one per source shard) run on an administrative instance that is continually monitored for delays.

A key design step required in making our inverted pyramid possible was splitting our data store into five ‘segment’ databases, which are provisioned and depleted by two external jobs. This made it possible to drop data (at the db level) from the first two layers, RecentDB and HistoricalDB, in an orderly fashion without impacting any part of the application or compromising performance.

The last step of our data migration consists of offloading all data that passes the 90-day mark from the segment databases to S3. To accomplish this, each HistoricalDB secondary node is provisioned with two Python modules that parse through, collect, and export (in CSV format) all data older than 90 days. The legacy data is then uploaded into an S3 bucket and made available to other parts of our system.

An added benefit of our data architecture is the ability to use HistoricalDB on the off chance that a major issue impacts RecentDB. Although there is a storage space trade-off that comes with storing the data in the 0-20 day intervals on both clusters, having HistoricalDB on hand has proven useful for us in the past, with the AWS EAST-1 crash in the summer of 2012 being the most recent incident.

Conclusion

With MongoDB, we were able to quickly develop the query processes we needed to efficiently serve our customers, all on a flexible database architecture that stresses high performance and redundancy. MongoDB has been a partner that continues to deliver as we grow and tackle new challenges.

To learn more about how MongoDB can have a significant impact on your business, download our whitepaper How a Database Can Make Your Organization Faster, Better, Leaner.