Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

Learn to Build AI-Enhanced Retail Search Solutions with MongoDB and Databricks

Francesco Baldissera, Ashwin Gangadhar, Vittal Pai14 min read • Published Sep 18, 2024 • Updated Sep 18, 2024
Node.jsKafkaAtlasVector SearchPython
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
In the rapidly evolving retail landscape, businesses are constantly seeking ways to optimize operations, improve customer experience, and stay ahead of competition. One of the key strategies to achieve this is through leveraging the opportunities search experiences provide.
Imagine this: You walk into a department store filled with products, and you have something specific in mind. You want a seamless and fast shopping experience — this is where product displays play a pivotal role. In the digital world of e-commerce, the search functionality of your site is meant to be a facilitating tool to efficiently display what users are looking for.
Shockingly, statistics reveal that only about 50% of searches on retail websites yield the results customers seek. Think about it — half the time, customers with a strong buying intent are left without an answer to their queries.
The search component of your e-commerce site is not merely a feature; it's the bridge between customers and the products they desire. Enhancing your search engine logic with artificial intelligence is the best way to ensure that the bridge is sturdy.
In this article, we'll explore how MongoDB and Databricks can be integrated to provide robust solutions for the retail industry, with a particular focus on the MongoDB Apache Spark Streaming processor; orchestration with Databricks workflows; data transformation and featurization with MLFlow and the Spark User Defined Functions; and by building a product catalog index, sorting, ranking, and autocomplete with Atlas Search.
Let’s get to it!

Solution overview

High-level overview of an event-driven architecture for an AI-enhanced search solution using MongoDB Atlas and Databricks
A modern e-commerce-backed system should be able to collate data from multiple sources in real-time, as well as batch loads, and be able to transform this data into a schema upon which a Lucene search index can be built. This enables discovery of the added inventory.
The solution should integrate website customer behavior events data in real-time to feed an “intelligence layer” that will create the criteria to display and order the most interesting products in terms of both relevance to the customer and relevance to the business.
These features are nicely captured in the above-referenced e-commerce architecture. We’ll divide it into four different stages or layers:
  1. Multi-tenant streaming ingestion: With the help of the MongoDB Kafka connector, we are able to sync real-time data from multiple sources to Mongodb. For the sake of simplicity, in this tutorial, we will not focus on this stage.
  2. Stream processing: With the help of the MongoDB Spark connector and Databricks jobs and notebooks, we are able to ingest data and transform it to create machine learning model features.
  3. AI/ML modeling: All the generated streams of data are transformed and written into a unified view in a MongoDB collection called catalog, which is used to build search indexes and support querying and discovery of products.
  4. Building the search logic: With the help of Atlas Search capabilities and robust aggregation pipelines, we can power features such as search/discoverability, hyper-personalization, and featured sort on mobile/web applications.

Prerequisites

Before running the app, you'll need to have the following installed on your system:

Streaming data into Databricks

In this tutorial, we’ll focus on explaining how to orchestrate different ETL pipelines in real time using Databricks Jobs. A Databricks job represents a single, standalone execution of a Databricks notebook, script, or task. It is used to run specific code or analyses at a scheduled time or in response to an event.
Our search solution is meant to respond to real-time events happening in an e-commerce storefront, so the search experience for a customer can be personalized and provide search results that fit two criteria:
  1. Relevant for the customer: We will define a static score comprising behavioral data (click logs) and an Available to Promise status, so search results are products that we make sure are available and relevant based off of previous demand.
  2. Relevant for the business: The results will be scored based on which products are more price sensitive, so higher price elasticity means they appear first on the product list page and as search results. We will also compute an optimal suggested price for the product.
So let’s check out how to configure these ETL processes over Databricks notebooks and orchestrate them using Databricks jobs to then fuel our MongoDB collections with the intelligence that we will use to build our search experience.

Databricks jobs for product stream processing, static score, and pricing

We’ll start by explaining how to configure notebooks in Databricks. Notebooks are a key tool for data science and machine learning, allowing collaboration, real-time coauthoring, versioning, and built-in data visualization. You can also make them part of automated tasks, called jobs in Databricks. A series of jobs are called workflows. Your notebooks and workflows can be attached to computing resources that you can set up at your convenience, or they can be run via autoscale.
Learn more about how to configure jobs in Databricks using JSON configuration files.
You can find our first job JSON configuration files in our GitHub. In these JSON files, we specify the different parameters on how to run the various jobs in our Databricks cluster. We specify different parameters such as the user, email notifications, task details, cluster information, and notification settings for each task within the job. This configuration is used to automate and manage data processing and analysis tasks within a specified environment.
Now, without further ado, let’s start with our first workflow, the “Catalog collection indexing workflow.”

Catalog collection indexing workflow

Overview of a Databricks job, including two pipelines to ingest data from MongoDB collections, transform that data, and vectorize it using a text transformer model
The above diagram shows how our solution will run two different jobs closely related to each other in two separate notebooks. Let’s unpack this job with the code and its explanation:
The first part of your notebook script is where you’ll define and install different packages. In the code below, we have all the necessary packages, but the main ones — pymongo and tqdm — are explained below:
  • PyMongo is commonly used in Python applications that need to store, retrieve, or analyze data stored in MongoDB, especially in web applications, data pipelines, and analytics projects.
  • tqdm is often used in Python scripts or applications where there's a need to provide visual feedback to users about the progress of a task.
The rest of the packages are pandas, JSON, and PySpark. In this part of the snippet, we also define a variable for the MongoDB connection string to our cluster.
1%pip install pymongo tqdm
2
3
4import pandas as pd
5import json
6from collections import Counter
7from tqdm import tqdm
8from pymongo import MongoClient
9from pyspark.sql import functions as F
10from pyspark.sql import types as T
11from pyspark.sql import Window
12import pyspark
13from pyspark import SparkContext
14from pyspark.sql import SparkSession
15conf = pyspark.SparkConf()
16
17
18import copy
19import numpy as np
20
21
22tqdm.pandas()
23
24MONGO_CONN = 'mongodb+srv://:@retail-demo.2wqno.mongodb.net/?retryWrites=true&w=majority'

Data streaming from MongoDB

The script reads data streams from various MongoDB collections using the spark.readStream.format("mongodb") method.
For each collection, specific configurations are set, such as the MongoDB connection URI, database name, collection name, and other options related to change streams and aggregation pipelines.
The snippet below is the continuation of the code from above. It can be put in a different cell in the same notebook.
1atp = spark.readStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "atp_status_myn").\ option('spark.mongodb.change.stream.publish.full.document.only','true').\ option('spark.mongodb.aggregation.pipeline',[]).\ option("forceDeleteTempCheckpointLocation", "true").load()
In this specific case, the code is reading from the atp_status collection. It specifies options for the MongoDB connection, including the URI, and enables the capture of the full document when changes occur in the MongoDB collection. The empty aggregation pipeline indicates that no specific transformations are applied at this stage.
Following with the next stage of the job for the atp_status collection, we can break down the code snippet into three different parts:

Data transformation and data writing to MongoDB

After reading the data streams, we drop the _id field. This is a special field that serves as the primary key for a document within a collection. Every document in a MongoDB collection must have a unique _id field, which distinguishes it from all other documents in the same collection. As we are going to create a new collection, we need to drop the previous _id field of the original documents, and when we insert it into a new collection, a new _id field will be assigned.
1atp = atp.drop("_id")

Data writing to MongoDB

The transformed data streams are written back to MongoDB using the writeStream.format("mongodb") method.
The data is written to the catalog_myn collection in the search database.
Specific configurations are set for each write operation, such as the MongoDB connection URI, database name, collection name, and other options related to upserts, checkpoints, and output modes.
The below code snippet is a continuation of the notebook from above.
1atp.writeStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "catalog_myn").\ option('spark.mongodb.operationType', "update").\ option('spark.mongodb.upsertDocument', True).\ option('spark.mongodb.idFieldList', "id").\

Checkpointing

Checkpoint locations are specified for each write operation. Checkpoints are used to maintain the state of streaming operations, allowing for recovery in case of failures. The checkpoints are stored in the /tmp/ directory with specific subdirectories for each collection.
Here is an example of checkpointing. It’s included in the script right after the code from above.
1option("forceDeleteTempCheckpointLocation", "true").\ option("checkpointLocation", "/tmp/retail-atp-myn4/_checkpoint/").\ outputMode("append").\ start()
The full snippet of code performs different data transformations for the various collections we are ingesting into Databricks, but they all follow the same pattern of ingestion, transformation, and rewriting back to MongoDB. Make sure to check out the full first indexing job notebook.
For the second part of the indexing job, we will use a user-defined function (UDF) in our code to embed our product catalog data using a transformers model. This is useful to be able to build Vector Search features.
This is an example of how to define a user-defined function. You can define your functions early in your notebook so you can reuse them later for running your data transformations or analytics calculations. In this case, we are using it to embed text data from a document.
The ‘@F.udf()’ decorator is used to define a user-defined function in PySpark using the F object, which is an alias for the pyspark.sql.functions module. In this specific case, it is defining a UDF named ‘get_vec’ that takes a single argument text and returns the result of calling ‘model.encode(text)’.
The code from below is a continuation of the same notebook.
1@F.udf() def get_vec(text):
2 return model.encode(text)
Our notebook code continues with similar snippets to previous examples. We'll use the MongoDB Connector for Spark to ingest data from the previously built catalog collection.
1catalog_status = spark.readStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "catalog_myn").\ option('spark.mongodb.change.stream.publish.full.document.only','true').\ option('spark.mongodb.aggregation.pipeline',[]).\ option("forceDeleteTempCheckpointLocation", "true").load()
Then, it performs data transformations on the catalog_status DataFrame, including adding a new column, the atp_status that is now a boolean value, 1 for available, and 0 for unavailable. This is useful for us to be able to define the business logic of the search results showcasing only the products that are available.
We also calculate the discounted price based on data from another job we will explain further along.
The below snippet is a continuation of the notebook code from above:
1catalog_status = catalog_status.withColumn("discountedPrice", F.col("price") * F.col("pred_price")) catalog_status = catalog_status.withColumn("atp", (F.col("atp").cast("boolean") & F.lit(1).cast("boolean")).cast("integer"))
We vectorize the title of the product and we create a new field called “vec”. We then drop the "_id" field, indicating that this field will not be updated in the target MongoDB collection.
1catalog_status.withColumn("vec", get_vec("title")) catalog_status = catalog_status.drop("_id")
Finally, it sets up a structured streaming write operation to write the transformed data to a MongoDB collection named "catalog_final_myn" in the "search" database while managing query state and checkpointing.
1catalog_status.writeStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "catalog_final_myn").\ option('spark.mongodb.operationType', "update").\ option('spark.mongodb.idFieldList', "id").\ option("forceDeleteTempCheckpointLocation", "true").\ option("checkpointLocation", "/tmp/retail-atp-myn5/_checkpoint/").\ outputMode("append").\ start()
Let’s see how to configure the second workflow to calculate a BI score for each product in the collection and introduce the result back into the same document so it’s reusable for search scoring.

BI score computing logic workflow

Diagram overview of the BI score computing job logic using materialized views to ingest data from a MongoDB collection and process user click logs with Empirical Bayes algorithm.
In this stage, we will explain the script to be run in our Databricks notebook as part of the BI score computing job. Please bear in mind that we will only explain what makes this code snippet different from the previous, so make sure to understand how the complete snippet works. Please feel free to clone our complete repository so you can get a full view on your local machine.
We start by setting up the configuration for Apache Spark using the SparkConf object and specify the necessary package dependency for our MongoDB Spark connector.
1conf = pyspark.SparkConf() conf.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.0")
Then, we initialize a Spark session for our Spark application named "test1" running in local mode. It also configures Spark with the MongoDB Spark connector package dependency, which is set up in the conf object defined earlier. This Spark session can be used to perform various data processing and analytics tasks using Apache Spark.
The below code is a continuation to the notebook snippet explained above:
1spark = SparkSession.builder \
2 .master("local") \
3 .appName("test1") \
4 .config(conf = conf) \
5 .getOrCreate()
We’ll use MongoDB Aggregation Pipelines in our code snippet to get a set of documents, each representing a unique "product_id" along with the corresponding counts of total views, purchases, and cart events. We’ll use the transformed resulting data to feed an Empirical Bayes algorithm and calculate a value based on the cumulative distribution function (CDF) of a beta distribution.
Make sure to check out the entire .ipynb file in our repository.
This way, we can calculate the relevance of a product based on the behavioral data described before. We’ll also use window functions to calculate different statistics on each one of the products — like the average of purchases and the purchase beta (the difference between the average total clicks and average total purchases) — to use as input to create a BI relevance score. This is what is shown in the below code:
1@F.udf(T.FloatType())
2
3def beta_fn(pct,a,b):
4 return float(100*beta.cdf(pct, a,b)) w = Window().partitionBy() df =
5df.withColumn("purchase_alpha", F.avg('purchase').over(w)) df = df.withColumn("cart_alpha", F.avg('cart').over(w)) df = df.withColumn("total_views_mean", F.avg('total_views').over(w)) df = df.withColumn("purchase_beta", F.expr('total_views_mean - purchase_alpha'))
6
7df = df.withColumn("cart_beta", F.expr('total_views_mean - cart_alpha')) df = df.withColumn("purchase_pct", F.expr('(purchase+purchase_alpha)/(total_views+purchase_alpha+purchase_beta)'))
8df = df.withColumn("cart_pct", F.expr('(purchase+cart_alpha)/(total_views+cart_alpha+cart_beta)'))
After calculating the BI score for our product, we want to use a machine learning algorithm to calculate the price elasticity of demand for the product and the optimal price.

Calculating optimal price workflow

Diagram showcasing the pricing workflows to be run as Databricks notebooks
For calculating the optimal recommended price, first, we need to figure out a pipeline that will shape the data according to what we need. Get the pipeline definition in our repository.
We’ll first take in data from the MongoDB Atlas click logs (clog) collection that’s being ingested in the database in real-time, and create a DataFrame that will be used as input for a Random Forest regressor machine learning model. We’ll leverage the MLFlow library to be able to run MLOps stages, run tests, and register the best-performing model that will be used in the second job to calculate the price elasticity of demand, the suggested discount, and optimal price for each product. Let’s see what the code looks like!
1model_name = "retail_competitive_pricing_model_1"
2with mlflow.start_run(run_name=model_name):
3 # Create and fit a linear regression model
4 model = RandomForestRegressor(n_estimators=50, max_depth=3)
5 model.fit(X_train, y_train)
6 wrappedModel = CompPriceModelWrapper(model)
7
8 # Log model parameters and metrics
9 mlflow.log_params(model.get_params())
10 mlflow.log_metric("mse", np.mean((model.predict(X_test) - y_test) ** 2))
11
12 # Log the model with a signature that defines the schema of the model's inputs and outputs.
13 # When the model is deployed, this signature will be used to validate inputs.
14 signature = infer_signature(X_train, wrappedModel.predict(None,X_train))
15
16 # MLflow contains utilities to create a conda environment used to serve models.
17 # The necessary dependencies are added to a conda.yaml file which is logged along with the model.
18 conda_env = _mlflow_conda_env(
19 additional_conda_deps=None,
20 additional_pip_deps=["scikit-learn=={}".format(sklearn.__version__)],
21 additional_conda_channels=None,
22 )
23 mlflow.pyfunc.log_model(model_name, python_model=wrappedModel, conda_env=conda_env, signature=signature)
After we’ve done the test and train split required for fitting the model, we leverage the mlFlow model wrapping to be able to log model parameters, metrics, and dependencies.
For the next stage, we apply the previously trained and registered model to the sales data:
1model_name = "retail_competitive_pricing_model_1"
2apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/staging")
3
4# Apply the model to the new data
5columns = ['old_sales','total_sales','min_price','max_price','avg_price','old_avg_price']
6udf_inputs = struct(*columns)
7udf_inputs
Then, we just need to create the sales DataFrame with the resulting data. But first, we use the .fillna function to make sure all our null values are cast into floats 0.0. We need to perform this so our model has proper data and because most machine learning models return an error if you pass null values.
Now, we can calculate new columns to add to the sales DataFrame: the predicted optimal price, the price elasticity of demand per product, and a discount column which will be rounded up to the next nearest integer. The below code is a continuation of the code from above — they both reside in the same notebook:
1sales = sales.fillna(0.0)
2sales = sales.withColumn("pred_price",apply_model_udf(udf_inputs))
3
4sales = sales.withColumn("price_elasticity", F.expr("((old_sales - total_sales)/(old_sales + total_sales))/(((old_avg_price - avg_price)+1)/(old_avg_price + avg_price))"))
5
6sales = sales.withColumn("discount", F.ceil((F.lit(1) - F.col("pred_price"))*F.lit(100)))
Then, we push the data back using the MongoDB Connector for Spark into the proper MongoDB collection. These will be used together with the rest as the baseline on top of which we’ll build our application’s search business logic.
1sales.select("id", "pred_price", "price_elasticity").write.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "price_myn").\ option('spark.mongodb.idFieldList', 'id').\ mode('overwrite').\ save()
After these workflows are configured, you should be able to see the new collections and updated documents for your products.

Building the search logic

To build the search logic, first, you’ll need to create an index. This is how we’ll make sure that our application runs smoothly as a search query, instead of having to look into all the documents in the collection. We will limit the scan by defining the criteria for those scans.
To understand more about indexing in MongoDB, you can check out the article from the documentation. But for the purposes of this tutorial, let’s dive into the two main parameters you’ll need to define for building our solution:
Mappings: This key dictates how fields in the index should be stored and how they should be treated when queries are made against them.
Fields: The fields describe the attributes or columns of the index. Each field can have specific data types and associated settings. We implement the sortable number functionality for the fields ‘pred_price’, ‘price_elasticity’, and ‘score’. So in this way, our search results are organized by relevance.
The latter steps of building the solution come to defining the index mapping for the application. You can find the full mappings snippet in our GitHub repository.
To configure the index, you can insert the snippet in MongoDB Atlas by browsing your cluster splash page and clicking over the “Search” tab: Overview of the Search configuration panel for MongoDB Atlas collections
Next, you can click over “Create Index.” Make sure you select “JSON Editor”:
Overview of the JSON Editor functionality in the Search Configuration panel for MongoDB Atlas
Paste the JSON snippet from above — make sure you select the correct database and collection! In our case, the collection name is catalog_final_myn.

Autocomplete

To define autocomplete indexes, you can follow the same browsing instructions from the Building the search logic stage, but in the JSON editor, your code snippet may vary. Follow our tutorial to learn how to fully configure autocomplete in Atlas Search.
For our search solution, check out the code below. We define how the data should be treated and indexed for autocomplete features.
1{
2 "mappings": {
3 "dynamic": false,
4 "fields": {
5 "query": [
6 {
7 "foldDiacritics": false,
8 "maxGrams": 7,
9 "minGrams": 3,
10 "tokenization": "edgeGram",
11 "type": "autocomplete"
12 }
13 ]
14 }
15 }
16}
Let’s break down each of the parameters:
foldDiacritics: Setting this to false means diacritic marks on characters (like accents on letters) are treated distinctly. For instance, "résumé" and "resume" would be treated as different words.
minGrams and maxGrams: These specify the minimum and maximum lengths of the edge n-grams. In this case, it would index substrings (edgeGrams) with lengths ranging from 3 to 7.
Tokenization: The value edgeGram means the text is tokenized into substrings starting from the beginning of the string. For instance, for the word "example", with minGrams set to 3, the tokens would be "exa", "exam", "examp", etc. This is commonly used in autocomplete scenarios to match partial words.
After all of this, you should have an AI-enhanced search functionality for your e-commerce storefront!

Conclusion

In summary, we’ve covered how to integrate MongoDB Atlas and Databricks to build a performant and intelligent search feature for an e-commerce application.
By using the MongoDB Connector for Spark and Databricks, along with MLFlow for MLOps, we've created real-time pipelines for AI. Additionally, we've configured MongoDB Atlas Search indexes, utilizing features like Autocomplete, to build a cutting-edge search engine.
Grasping the complexities of e-commerce business models is complicated enough without also having to handle knotty integrations and operational overhead! Counting on the right tools for the job gets you several months ahead out-innovating the competition.
Check out the GitHub repository or reach out over LinkedIn if you want to discuss search or any other retail functionality!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Getting Started With Azure Spring Apps and MongoDB Atlas: A Step-by-Step Guide


Jan 27, 2024 | 5 min read
Code Example

Trends Analyser


Sep 11, 2024 | 1 min read
Tutorial

Developing Your Applications More Efficiently with MongoDB Atlas Serverless Instances


Feb 03, 2023 | 7 min read
Article

Atlas Online Archive: Efficiently Manage the Data Lifecycle


Apr 23, 2024 | 8 min read
Table of Contents