BlogAnnounced at MongoDB.local NYC 2024: A recap of all announcements and updatesLearn more >>
MongoDB Developer
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right

How to Leverage an Event-Driven Architecture with MongoDB and Databricks

Francesco Baldissera9 min read • Published Jul 13, 2023 • Updated Jul 13, 2023
SparkAggregation FrameworkPythonJavaScriptMongoDB
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
Follow along with this tutorial to get a detailed view of how to leverage MongoDB Atlas App Services in addition to Databricks model building and deployment capabilities to fuel data-driven strategies with real-time events data. Let’s get started!

The basics

We’re going to use a MongoDB Atlas M10 cluster as the backend service for the solution. If you are not familiar with MongoDB Atlas yet, you can follow along with the Introduction to MongoDB course to start with the basics of cluster configuration and management.

Data collection

The solution is based on data that mimics a collection from an event-driven architecture ingestion from an e-commerce website storefront. We’re going to use a synthetic dataset to represent what we would receive in our cloud database coming from a Kafka stream of events. The data source can be found on Kaggle.
The data is in a tabular format. When converted into an object suitable for MongoDB, it will look like this:
The event-driven architecture is very simple. It is made up of only four different events that a user can perform on the e-commerce site:
"view"A customer views a product on the product detail page.
"cart"A customer adds a product to the cart.
"remove_from_cart"A customer removes a product from the cart.
"purchase"A customer completes a transaction of a specific product.
The data in the Kaggle dataset is made of 4.6 million documents, which we will store in a database named "ecom_events" and under the collection "cosmetics". This collection represents all the events happening in a multi-category store during November 2019.
We’ve chosen this date specifically because it will contain behavior corresponding to Black Friday promotions, so it will surely showcase price changes and thus, it will be more interesting to evaluate the price elasticity of products during this time.

Aggregate data in MongoDB

Using the powerful MongoDB Atlas Aggregation Pipeline, you can shape your data any way you need. We will shape the events in an aggregated view that will give us a “purchase log” so we can have historical prices and total quantities sold by product. This way, we can feed a linear regression model to get the best possible fit of a line representing the relationship between price and units sold.
Below, you’ll find the different stages of the aggregation pipeline:
  1. Match: We are only interested in purchasing events, so we run a match stage for the event_type key having the value 'purchase'.
  2. Group: We are interested in knowing how many times a particular product was bought in a day and at what price. Therefore, we group by all the relevant keys, while we also do a data type transformation for the “event_time”, and we compute a new field, “total_sales”, to achieve daily total sales at a specific price point.
  3. Project: Next, we run a project stage to get rid of the object nesting resulting after the group stage. (Check out the MongoDB Compass Aggregation Pipeline Builder as you will be able to see the result of each one of the stages you add in your pipeline!)
  4. Group, Sort, and Project: We need just one object that will have the historic sales of a product during the time, a sort of time series data log computing aggregates over time. Notice how we will also run a data transformation on the ‘$project’ stage to get the ‘revenue’ generated by that product on that specific day. To achieve this, we need to group, sort, and project as such:
  5. Out: The last stage of the pipeline is to push our properly shaped objects to a new collection called “purchase_log”. This collection will serve as the base to feed our model, and the aggregation pipeline will be the baseline of a trigger function further along to automate the generation of such log every time there’s a purchase, but in that case, we will use a $merge stage.
With this aggregation pipeline, we are effectively transforming our data to the needed purchase log to understand the historic sales by the price of each product and start building our dashboard for category leads to understand product sales and use that data to compute the price elasticity of demand of each one of them.

Intelligence layer: Building your model and deploying it to a Databricks endpoint

The goal of this stage is to be able to compute the price elasticity of demand of each product in real-time. Using Databricks, you can easily start up a cluster and attach your model-building Notebook to it.
On your Notebook, you can import MongoDB data using the MongoDB Connector for Spark, and you can also take advantage of the MlFlow custom Python module library to write your Python scripts, as this one below:
But also, you could log the experiments and then register them as models so they can be then served as endpoints in the UI:
Logging the model as experiment directly from the Notebook:
Check the logs of all the experiments associated with a certain Notebook.
From a specific experiment page, you can click on “Register Model” to have it saved for inference.
From the model page, you can click on “deploy model” and you’ll get an endpoint URL.
Serving a model over and endpoint will allow you to retrieve the endpoint URL and you could also query it directly from the Databricks UI for testing purposes.
Once you have tested your model endpoint, it’s time to orchestrate your application to achieve real-time analytics.

Orchestrating your application

For this challenge, we’ll use MongoDB Triggers and Functions to make sure that we aggregate the data only of the last bought product every time there’s a purchase event and we recalculate its price elasticity by passing its purchase log in an HTTP post call to the Databricks endpoint.

Aggregating data after each purchase

First, you will need to set up an event stream that can capture changes in consumer behavior and price changes in real-time, so it will aggregate and update your purchase_log data.
By leveraging MongoDB App Services, you can build event-driven applications and integrate services in the cloud. So for this use case, we would like to set up a Trigger that will “listen” for any new “purchase” event in the cosmetics collection, such as you can see in the below screenshots. To get you started on App Services, you can check out the documentation.
In your App Services dashboard, navigate to Triggers.
After clicking on “Add Trigger,” you can configure it to execute only when there’s a new insert in the collection:
Configuring the operation type by which the trigger will execute
Scrolling down the page, you can also configure the function that will be triggered:
Selecting which function do you want to execute as a response to the trigger executing
Such functions can be defined (and tested) in the function editor. The function we’re using simply retrieves data from the cosmetics collection, performs some data processing on the information, and saves the result in a new collection.
The above function is meant to shape the data from the last product_id item purchased into the historic purchase_log needed to compute the price elasticity. As you can see in the code below, the result creates a document with historical price and total purchase data:
Note how we implement the $merge stage so we make sure to not overwrite the previous collection and just upsert the data corresponding to the latest bought item.

Computing the price elasticity

The next step is to process the event stream and calculate the price elasticity of demand for each product. For this, you may set up a trigger so that every time there’s an insert or replace in the “purchase_log” collection, we will do a post-HTTP request for retrieving the price elasticity.
Configuring the tigger to execute every time the collection has an insert or replace of documents
The trigger will execute a function such as the one below:

Visualize data with MongoDB Charts

Finally, you will need to visualize the data to make it easier for stakeholders to understand the price elasticity of demand for each product. You can use a visualization tool like MongoDB Charts to create dashboards and reports that show the price elasticity of demand over time and how it is impacted by changes in price, product offerings, and consumer behavior.

Evolving your apps

The new variable “price_elasticity” can be easily passed to the collections that nurture your PIMS, allowing developers to build another set of rules based on these values to automate a full-fledged dynamic pricing tool.
It can also be embedded into your applications. Let’s say an e-commerce CMS system used by your category leads to manually adjusting the prices of different products. Or in this case, to build different rules based on the price elasticity of demand to automate price setting.
The same data can be used as a feature for forecasting total sales and creating a recommended price point for net revenue.
In conclusion, this framework might be used to create any kind of real-time analytics use case you might think of in combination with any of the diverse use cases you’ll find where machine learning could be used as a source of intelligent and automated decision-making processes.
Find all the code used in the GitHub repository and drop by the Community Forum for any further questions, comments or feedback!!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial

Java 21: Unlocking the Power of the MongoDB Java Driver With Virtual Threads

Jan 31, 2024 | 2 min read

Getting Set Up to Run PHP with MongoDB

Jan 31, 2024 | 12 min read

Secure your API with Spring Data MongoDB and Microsoft EntraID

Apr 02, 2024 | 8 min read

Build a RESTful API with HapiJS and MongoDB

May 31, 2022 | 15 min read
Table of Contents