Retail Reference Architecture Part 4: Recommendations and Personalizations

MongoDB

Technical
Facebook ShareLinkedin ShareReddit ShareTwitter Share

Series:

  1. Building a Flexible, Searchable, Low-Latency Product Catalog
  2. Approaches to Inventory Optimization
  3. Query Optimization and Scaling
  4. Recommendations and Personalizations

In the first three parts of our series on retail reference architecture, we focused on two practical applications of MongoDB in the retail space: product catalogs and inventory systems. Both of these are fairly conventional use cases where MongoDB acts as a system of record for a relatively static and straightforward collection of data. For example, in part one of our series where we focused on the product catalog, we used MongoDB to store and retrieve an inventory of items and their variants.

Today we’ll be looking at a very different application of MongoDB in the retail space, one that even those familiar with MongoDB might not think it is well suited for: logging a high volume of user activity data and performing data analytics. This final use case demonstrates how MongoDB can enable scalable insights, including recommendations and personalization for your customers.

Activity Logging

In retail, maintaining a record of each user’s activities gives a company the means to gain valuable predictive insight into user behavior, but it comes at a cost. For a retailer with hundreds of thousands or millions of customers, logging all of the activities generated by our customer base creates a huge amount of data, and storing that data in a useful and accessible way becomes a challenging task. The reason for this is that just about every activity performed by a user can be of interest to us, such as:

  • Search
  • Product views, likes or wishes
  • Shopping cart add/remove
  • Social network sharing
  • Ad impressions

Even from this short list, it’s easy to see how the amount of data generated can quickly become problematic, both in terms of the cost/volume of storage needed, and a company’s ability to utilize the data in a meaningful way. After all, we’re talking about potentially hundreds of thousands of writes per second, which means to gain any insights from our data set we are effectively trying to drink from the fire hose. The potential benefits, however, are huge.

With this type of data a retailer can gain a wealth of knowledge that will help predict user actions and preferences for the purposes of upselling and cross-selling. In short, the better any retailer can predict what their users want, the more effectively they can drive a consumer to additional products they may want to purchase.

Requirements

For MongoDB to meet the needs of our use case, we need it to handle the following requirements:

Ingestion of hundreds of thousands of writes per second: Normally MongoDB performs random access writes. In our use case this could lead to an unacceptable amount of disk fragmentation, so we used HVDF (more on this in a later) to store the data sequentially in an append-only fashion.

Flexible schema: To minimize the amount of storage space required, the schema of each activity being logged is stored in the same format and size as it is received.

Fast querying and sorting on varied fields: Secondary Btree indexes ensure that our most common lookups and sorts will be performed in milliseconds.

Easy deletes of old data: Typically, deleting large numbers of documents is a relatively expensive operation in MongoDB. By time partitioning our data into collections using HVDF we are able to drop entire collections as a free operation.

Data Model

As mentioned earlier, one of the requirements of our solution is the use of flexible schemas so that data is stored in the same format it is received; however, we do still need to put some thought into a general data model for each activity being recorded.

The following is an example that outlines some of the attributes we may want to capture across all samples:

{      _id: ObjectId(),
        geoCode: 1, // used to localize write operations
    sessionId: “2373BB…", // tracks activities across sessions
    device: { id: “1234", // tracks activities across different user devices
                    type: "mobile/iphone",
                  userAgent: "Chrome/34.0.1847.131"
                }
    type: "VIEW|CART_ADD|CART_REMOVE|ORDER|…", // type of activity
    itemId: “301671", // item that was viewed, added to cart, etc.
    sku: “730223104376", //item sku
    order: { id: “12520185", // info about orders associated with the activity
        … },
    location: [ -86.95444, 33.40178 ], //user’s location when the activity was performed
    tags: [ "smartphone", "iphone", … ], // associated tags
    timeStamp: Date("2014/04/01 …”) // time the activity was performed
}

This is just one possibility of what an activity might look like. A major concern here is to persist only as much information as is necessary for each activity type to minimize required disk space. As a result, each document will vary depending on the type of activity data being captured.

High Volume Data Feed (HVDF)

HVDF is an open-source framework created by the team at MongoDB, which makes it easy to efficiently validate, store, index, query and purge time series data in MongoDB via a simple REST API.

In HVDF, incoming data consists of three major components:

  • Feed: Maps to a database. In our use case, we will have one feed per user to capture all of their activities.
  • Channel: Maps to a collection. Each channel represents an activity type or source being logged.
  • Sample: Maps to a document. A separate document is written for each user activity being logged.

HVDF allows us to easily use MongoDB in an append-only fashion, meaning that our documents will be stored sequentially, minimizing any wasted space when we write to disk. As well, HVDF handles a number of configuration details that make MongoDB more efficient for this type of high-volume data storage, including time-based partitioning, and disabling of the default power of 2 sizes disk allocation strategy.

More information and the source code for HVDF are available on GitHub.

Time Partitioning

In our usage, we take advantage of HVDF’s time slicing feature, which automatically time partitions samples by creating a new collection at a specified time interval. This has four advantages:

  • Sequential writes: Since a new collection is created for each time interval, samples are always written to disk sequentially.

  • Fast deletes: By treating the data as sets, deletes are essentially free. All we need to do is drop the collection.

  • Index size: User activity logging creates a huge amount of data. Were we to create one collection for all samples per channel our index of of those samples could quickly become huge. By slicing our channels into time intervals, we keep our indexes small enough to fit in RAM, meaning queries using those indexes remain very performant.

  • Collections optimized for reads: The time interval for each collection can be configured to match the interval we are most likely to want to retrieve. In general, a best practice is to keep time partitions small enough that their indexes will fit in RAM, but large enough that you will only need to query across two collections for any given query.

  • Automatic sharding: HVDF automatically creates a shard key for each time interval as the collection is created. This ensures any lookups for a given time interval are performant, since samples for the same interval are written to the same shard.

To specify how our data will be time partitioned, we simply pass the following to our channel configuration:

{
    "time_slicing" : 
    {
        "type"   : "periodic",
        "config" : { "period" : {"weeks" : 4} }
    }
}

In this example, we are configuring HVDF to create a new collection per channel every 4 weeks. The ‘period’ may be specified in years, weeks, hours, minutes, seconds, milliseconds or any combination of these.

Setting _id

To keep our queries highly performant, we also need to put some thought into the construction of the ‘_id’ for our documents. Since we know this data is going to be used primarily for performing time-bounded user analytics, it’s logical to choose an ‘_id’ that embeds both the creation timestamp and the ‘userID’. HVDF makes this very simple. All we need to do is specify that HVDF should use the the ‘source_time_document’ id_type channel plugin in the channel configuration:

"storage" :
{
    "type"   : "raw",
    "config" : 
    {
        "id_factory" : 
        {
            "type"   : "source_time_document",
            "config" : {  }
        }       
    }
}   
```

This will give us an ‘_id’ that looks like this:

```
"_id" : {
    "source" : "user457",
    "ts" : NumberLong(324000000)
}

By using this format for ‘_id’, we guarantee near instantaneous queries for many of the most common queries we will need in order to perform analytics on our data.

User Insights

Now that we’ve figured out how our user’s activities will be logged in MongoDB, it’s time to actually look at what we can do with that data. The first important thing to keep in mind is that, as mentioned earlier, we are trying to drink from the fire hose. Capturing this type of user data very often leads to huge data sets, so all of our queries must be time bound. Not only does this limit the number of collections that are hit by each query thanks to our time partitioning, it also keeps the amount of data being searched sane. When dealing with terabytes of data, a full table scan isn’t even an option.

Queries

Here are a couple of simple queries we might commonly use to gain some insight into a user’s behavior. These would require secondary indexes on ‘userId’ and ‘itemId’:

  • Recent activity for a user:
db.activity.find({ userId: “u123”,
           ts: {“$g”t:1301284969946, // time bound the query
                 “$lt”: 1425657300} })
            .sort({ time: -1 }).limit(100) // sort in desc order
  • Recent activity for a product:
db.activity.find({ itemId: “301671", // requires a secondary index on timestamp + itemId
           ts: {“$g”t:1301284969946,
                 “$lt”: 1425657300} })
            .sort({ time: -1 }).limit(100)

To gain even more insight into what our users are doing, we can also make use of the aggregation framework in MongoDB:

  • Recent number of views, purchases, etc for user
db.activities.aggregate(([
   { $match: { userId: "u123", ts: { $gt: DATE }}}, // get a time bound set of activities for a user 
   { $group: { _id: "$type", count: { $sum: 1 }}}]) // group and sum the results by activity type
  • Recent total sales for a user
db.activities.aggregate(([
  { $match: { userId:"u123", ts:{$gt:DATE}, type:"ORDER"}},
   { $group: { _id: "result", count: {$sum: "$total" }}}]) // sum the total of all orders

-Recent number of views, purchases, etc for item

db.activities.aggregate(([
   { $match: { itemId: "301671", ts: { $gt: DATE }}}, 
   { $group: { _id: "$type", count: { $sum: 1 }}}])

There are, of course, many different things the results of these queries might tell us about our user’s behaviors. For example, over time, we might see that the same user has viewed the same item multiple times, meaning we might want to suggest that item to them the next time they visit our site. Similarly, we may find that a particular item has had a surge in purchases due to a sale or ad campaign, meaning we might want to suggest it to users more frequently to further drive up sales.

Up-selling is a big possibility here as well. For example, let’s say we know a user has looked at several different low-end cell phones, but that in general their total purchases are relatively high. In this case, we might suggest that they look at a higher-end device that is popular amongst users with similar stats.

Map-reduce

Another option for performing analytics with MongoDB is to use map-reduce. This is often a good choice for very large data sets when compared with the aggregation framework for a few reasons. For example, take the following aggregation that calculates the total number of times an activity has been logged in the past hour for all unique visitors:

 db.activities.aggregate(([
   { $match: { time: { $gt: NOW-1H } }}, 
   { $group: { _id: "$userId", count: {$sum: 1} } }],
   { allowDiskUse: 1 })

Here we have several problems:

  • allowDiskUse: In MongoDB, any aggregation that involves a data set greater than 100MB will throw an error, since the aggregation framework is designed to be executed in RAM. Here, the allowDiskUse property mitigates this issue by forcing the aggregation to be performed on disk, which as you might imagine gets extremely slow for a very large data set.

  • Single shard final grouping: Calculating uniques is hard for any system, but there is a particular aspect of the aggregation framework that can make performance suffer heavily. In a sharded MongoDB instance, our aggregation would first be performed per shard. Next, the results would be recombined on the primary shard and then the aggregation would be run again to remove any duplication across shards. This can become prohibitively expensive for a very large result set, particularly if the data set is large enough to require disk to be used for the aggregation.

For these reasons, while map-reduce is not always a more performant option than the aggregation framework, it is better for this particular use case. Plus, the implementation is quite simple. Here’s the same query performed with map-reduce:

 var map = function() { emit(this.userId, 1); } // emit samples by userId with value of 1
 var reduce = function(key, values) 
                          { return Array.sum(values); } // sum the total samples per user
 db.activities.mapreduce(map, reduce, 
            { query: { time: { $gt: NOW-1H } }, // time bound the query to the last hour
              out: { replace: "lastHourUniques", sharded: true }) // persist the output to a collection

From here we can execute queries against the resulting collection:

  • Number activities for a user in the last hour
db.lastHourUniques.find({ userId: "u123" })
  • Total uniques in the last hour
db.lastHourUniques.count()

Cross-selling

Now to put it all together. One of the most valuable features our analytics can enable is the ability to cross-sell products to our users. For example, let’s say a user buys a new iPhone. There’s a vast number of related products they might also want to buy, such as cases, screen protectors, headphones, etc. By analyzing our user activity data, we can actually figure out what related items are commonly bought together and present them to the user to drive sales of additional items. Here’s how we do it.

First, we calculate the items bought by a user:

 var map = function() { emit(userId, this.itemId); } // emit samples as userId: itemId
 var reduce = function(key, values) 
                      { return values; } // Return the array of itemIds the user has bought
 db.activities.mapReduce(map, reduce, 
  { query: { type: “ORDER”, time: { $gt: NOW-24H }}, // include only activities of type order int he last day
    out: { replace: "lastDayOrders", sharded: true }) // persist the output to a collection

This will give an output collection that contains documents that look like this:

{ _id: "u123", items: [ 2, 3, 8 ] }

Next, we run a second map-reduce job on the outputed ‘lastDayOrders’ collection to compute the number of occurrences of each item pair:

 var map = function() { 
    for (i = 0; i < this.items.length; i++) {
        for (j = i + 1; j <= this.items.length; j++) {
            emit({a: this.items[i] ,b: this.items[j] }, 1); // emit each item pair
        }
    }
}

 var reduce = function(key, values) 
                      { return Array.sum(values); } // Sum all occurrences of each item pair

 db.lastDayOrders.mapReduce(map, reduce, 
  {out: { replace: "pairs", sharded: true }) // persist the output to a collection

This will give us an output collection that contains a count of how many times each set of items has been ordered together. The collection will contain documents that look like this:

{ _id: { a: 2, b: 3 }, value: 36 }

All we need to do from here is to create a secondary index on each item and its count, e.g. {_id.a:&#x201D;u123&#x201D;, count:36}. This makes determining which items to cross-sell a matter of a simple query to retrieve the items most commonly ordered together. In the following example, we retrieve only those items that have been ordered with itemId u123 more than 10 times, and sort in descending order:

db.pairs.find( { _id.a: "u123", count: { $gt: 10 }} )
             .sort({ count: -1 })

We can also optionally make the data more compact by grouping the most popular pairs in a separate collection, like this:

{ itemId: 2, recom: [ { itemId: 32, weight: 36}, 
                        { itemId: 158, weight: 23},
                           … ] }

Hadoop Integration

For many applications, either using the aggregation framework for real-time data analytics, or the map-reduce function for less frequent handling of large data sets in MongoDB are great options, but clearly, for complex number crunching of truly massive datasets, Hadoop is going to be the tool of choice. For this reason, we have created a Hadoop connector for MongoDB that makes processing data from a MongoDB cluster in Hadoop a much simpler task. Here are just a few of the features:

  • Read/write from MongoDB or exported BSON
  • Integration with Pig, Spark, Hive, MapReduce and others
  • Works with Apache Hadoop, Cloudera CDH, Hortonworks HDP
  • Open source
  • Support for filtering with MongoDB queries, authentication, reading from replica set tags, and more

For more information on the MongoDB Hadoop connector, check out the project on GitHub.

Recap

Performing user analytics, whether in real-time or incrementally, is no small task. Every step of the process from capturing user data, to data storage, to data retrieval, to crunching the numbers present challenges that require significant planning if the system is to perform well. Luckily, MongoDB and HVDF can be great tools for accomplishing these types of tasks, either on their own or used in conjunction with other tools like Hadoop.

That’s it for our four part series on retail reference architecture! In just four blog posts we’ve covered how MongoDB can be used to power some of the most common and complex needs of large-scale retailers, including product catalogs, inventory, and query optimization. For more tools and examples of what MongoDB can do for you, check out our GitHub.

Learn more

To discover how you can re-imagine the retail experience with MongoDB, read our white paper. In this paper, you'll learn about the new retail challenges and how MongoDB addresses them.


Learn more about how leading brands differentiate themselves with technologies and processes that enable the omni-channel retail experience.

Read our guide on the digitally oriented consumer


<< Read Part 3