使用MongoDB Atlas Stream Processing 和AI防止客户流失。实时检测客户的犹豫并trigger下一步最佳操作。
行业: 零售
产品: MongoDB Voyage AI、 MongoDB Atlas、 MongoDB Atlas Stream Processing、 MongoDB Vector Search
合作伙伴: Amazon Bedrock
解决方案概述
客户数保留是指组织保持客户购买其产品并转向其他提供商的能力。提高留存率,推动零售的长期成功。将保留率提高 5% 可以将利润提高 25% 至 95%。留住客户的成本远低于获取新客户的成本。现代商务平台会生成大量行为数据。客户通过搜索、产品视图、购物车操作和浏览活动创建这些数据。
大多数零售商将这些数据存储在记录系统中,并稍后通过批处理管道分析。因此,它们仅在会话结束后做出反应。几小时后,他们:
发送已放弃的购物车电子邮件。
开始重新定位营销活动。
查看仪表盘。
那时,客户已经离开,你就失去了影响购买的机会。在客户数仍处于活动状态时做出响应。从记录系统转向操作系统。检测会话期间的行为信号并实时做出反应。
图 1。批处理与流处理:稍后分析的存储数据与实时分析的事件。
使用MongoDB Atlas和Atlas Stream Processing构建实时行为管道。这些管道在点击流事件发生时对其进程,并将原始事件转换为可操作的上下文。使用这些工具,您可以:
创建捕获当前行为上下文的实时会话内存。
从近乎实时的交互模式中检测行为信号。
Trigger Agentic 指南客户实现转化的“下一步最佳行动”(NBA)。
通过 MongoDB Change Streams 对行为信号 React 。
将会话内存、客户数据和业务策略组合在统一的MongoDB智能数据层中。
通过在同一平台上使用Atlas Stream Processing以及熟悉的MongoDB查询API (MQL)和聚合框架,您可以避免基于SQL的流处理的僵化。通过使用灵活的MongoDB document model,您可以表示不断变化的会话上下文。单个会话文档捕获历史行为和最新活动快照,从而实现近乎实时的行为分析。
直接在MongoDB中处理流,而不是管理单独的流媒体基础架构,从而简化架构,并更快地交付事件驱动的应用程序。
图 2。使用Atlas Stream Processing实现这三个核心原则
您可以实时检测行为模式,以提高客户保留率,例如:
购买意向
搜索摩擦
被遗弃的风险
出现这些信号时立即做出反应。trigger 有针对性的操作,例如:
量身定制的产品推荐
上下文社交证明通知
与运输相关的优惠
图 3。根据实时用户行为实时触发下一步最佳操作,从而提高转化的可能性。
在客户会话处于活动状态时执行以下操作:
提高转化率。
提高客户终生价值。
构建长期忠诚度。
构建由MongoDB Atlas提供支持的实时客户保留系统。
参考架构
Atlas Stream Processing和MongoDB Atlas的客户数保留架构概述
要实现此解决方案,您必须了解数据流、事件处理阶段和关键架构组件:
图 4。由...提供支持MongoDB Atlas提供支持的客户数保留引擎
事件摄取层
在电子商务应用程序中,客户交互会生成实时事件流。您可以使用此应用程序执行以下操作:
每 10 秒发出一次心跳事件,以表明客户会话保持活动状态。
捕获客户操作,例如搜索、产品视图、添加到购物车和退出悬停。
在此演示解决方案中,您将事件流到MongoDB集合中。Change Streams将每个文档公开为一个新事件,从而创建为Stream Processing层提供数据的实时事件源。
您还可以从Apache Kafka、Google Cloud Pub/Sub、 Azure事件中心或 AWS Kinesis等平台摄取这些流,作为Atlas Stream Processing的源,而无需将事件存储在MongoDB Atlas中。
Atlas Stream Processing
Atlas Stream Processing用于在数据到达时对其进程。这使您能够:
使用
$source操作符连接到数据流。将多个流处理器用于不同的任务,例如构建会话状态和检测行为信号模式。
生成行为信号,例如高购买意向、搜索摩擦和退出风险。
数据接收器
$merge操作符用于将处理后的数据发送到目标。在此解决方案中:流处理器写入MongoDB ,以充当代理决策层的实时会话内存。
Atlas Stream Processing将从已处理事件派生的行为信号写入MongoDB。
MongoDB Change Streams会公开这些信号,以便在关键客户保留信号出现时触发Agentic NBA 层。
$merge操作符还可以将结果发送到Apache Kafka、AWS S3 或外部函数。助手
部署AI代理来读取行为信号并计算 NBA,例如:
量身定制的产品推荐
上下文社交证明通知
与运输相关的优惠
代理充当决策层。它可以作为简单或高级代理来实施。该解决方案使用结合了模型上下文协议 (MCP)、向量搜索、Voyage AI嵌入模型和大型语言模型 (LLM) 的确定性代理。这些工具评估上下文并生成最佳 NBA,以保持活跃购物者的参与度。
MongoDB Atlas
使用MongoDB Atlas作为商业和客户上下文层。该代理会从Atlas读取操作数据和客户历史记录以了解上下文。该代理会评估此数据并将 NBA 写入MongoDB集合。
Atlas可安全、扩展地简化整个代理生态系统的数据集成。这使得操作数据能够与目录、客户偏好和其他向量搜索用例的嵌入一起存在。
实时体验
在客户屏幕上实时显示 NBA 比赛。电子商务应用程序使用 Change Streams监控NBA集合。然后,应用程序会向活跃购物者显示有针对性的通知。示例包括:
战略产品上的消防图标
社会证明消息作为通知或在购物车列表中
以弹出窗口形式显示推荐或折扣通知
使用Atlas Stream Processing进行行为信号检测
尽管原始事件通常很嘈杂,但每个产品视图、搜索或购物车操作都包含一丝意图。使用Atlas Stream Processing将这些事件转换为清晰的会话上下文。
保持此上下文实时更新。用它来追踪会话中的行为并检测重要模式。以易于查询、解释和操作的格式,为决策层提供对此上下文的快速访问权限。在此架构中, Atlas Stream Processing就扮演该角色。
图 5。使用Atlas Stream Processing进行行为信号检测
ASP #1 不断将点击流 (1) (2) 转换为每个活动会话的单个 session_state文档,将会话内存与最新的 10 秒行为快照(4) 相结合。ASP #2 读取新的事件源 (5) 以检测更高级别的行为信号并将其汇入 session_signals (7)。两者都将处理后的数据存储在MongoDB中,而无效事件则路由到 DLQ 集合 (3) (6) 以进行调试和故障排除。
Atlas Stream Processing No.1:构建实时会话状态
为每个客户会话构建实时会话状态。此会话状态有两个用途:
构建会话内存:创建下游系统所需的会话内存。每个会话维护一个已处理文档,并每 10 秒更新一次。存储基本的会话上下文,例如第一个活动、最后一个活动、交互计数、最近行为和搜索历史记录。
解释意图:Go存储活动,还可以通过针对您的领域量身定制的意图模型来解释活动。
在此演示中,每 10 秒对最近的客户事件群组,以创建行为的结构化视图。使用此视图可追踪客户的注意力集中在何处以及该注意力如何随时间推移而转移。
使用三个概念定义意图模型:
维度:每次交互的行为视角。产品事件可标识特定产品和更广泛的上下文,例如
articleType和brand。这使您能够在多个级别读取每个事件的意图。客户可能会在探索多种产品的同时,继续关注同一类型的“维度”,例如文章类型。权重:交互信号的强度。并非所有操作都具有相同的意义。
在此演示中,权重为:
view-product = 3
add-to-cart = 7
对于维度中的每个项目,计算权重如下:
权重(项目)= 10 秒窗口中该项目的事件权重总和
示例:
如果客户查看“X”产品 (
P1) 两次,并将 P1 添加到购物车一次,并且 itemType 为“shoes”,则:权重 (P1) = 3 + 3 + 7 = 13
权重(鞋子)= 3 + 3 + 7 = 13
此计算表明对特定产品和“鞋子”都有强烈的兴趣。
焦点:使用焦点来衡量客户的注意力在某个维度内的集中程度。
对于维度中的每个项目,计算:
Focus(item) = Weight(item) / 同一窗口中该维度的总权重
示例:
如果 P1 是客户在该窗口中与之交互的唯一产品,则:
焦点 (P1) = 13 / 13 = 1.0
这意味着客户完全专注于一种产品。
如果客户在同一窗口中与两个产品进行交互,并且每个产品属于不同的 itemType,则:
视图产品 P1,其中,articleType = Shampoo → Weight(Shampoo) = 3
查看产品 P2,并将产品 P2 添加到购物车,其中,articleType = Conditioner → Weight(Conditioner) = 3+7 = 10
然后:
总权重(文章类型)= 3 + 10 = 13
So:
Focus(Shampoo) = 3 / 13 = 0.23
焦点(调节器)= 10 / 13 = 0.77
这意味着客户的注意力分散在文章类型维度中的多个项目上,但更关注护发素。
使用此模型追踪这些变量随时间的变化。每个 10 秒快照都会成为Atlas Stream Processing的结构化行为输入,2它会随着时间的推移评估更高级别的模式。这有助于您确定客户是否:
专注于特定商品、类别或品牌
广泛探索
表现出明确或不确定的意图
使用原生Atlas Stream Processing功能构建此会话状态:
使用 $source 从
events_ingest读取事件并保留用于窗口化的事件时间。使用 $validate 实施事件约定并将无效事件发送到
events_ingest_dlq。使用 $tumbleWindow 具有 10 秒事件时间间隔的 $tumbleWindow 为每个会话创建一个稳定的处理帧。
使用 $ 群组 将多个原始事件折叠为每个窗口的一个每会话快照。
使用 $addFields 和 $switch 为每次交互分配权重。
使用 $function 计算每个维度的权重和焦点的意图模型。您可以使用JavaScript逻辑进程每个滚动窗口中的事件,使用计数器和算术每 10 秒生成一次会话行为的结构化快照。这会创建一设立新数据,供 2 号Atlas Stream Processing在下一阶段使用。
使用 $merge 将结果更新或插入(upsert)到
session_state中,并为每个会话保留一份实时文档,其中包含最新快照和累积会话内存。
Atlas Stream Processing第 2 号:检测一段时间内的行为模式
从 session_state 读取结构化会话快照,而不是读取原始点击流事件。这为评估提供了一个经过处理的、具有时间感知性的行为层。
在此演示中,2 号Atlas Stream Processing使用三个流处理器。每个处理器检测一种信号类型:
high-intentsearch-frictionexit-risk
所有三个处理器都遵循相同的模式:
从中读取会话快照
session_state跨 30 秒窗口评估行为
将仅插入信号文档写入
session_signals
这种设计使管道保持简单和模块化。每个处理器使用相同的源和汇模式,但应用不同的规则来检测行为信号。将这些信号用作行为检查点。每个捕获都捕获会话行为中有意义的转变,例如:
产品融合
未解决的探索
打算离开。
保持信号稀疏、有时间限制且可解释。这使得决策层仅在相关事件发生时做出反应,而不是处理每个原始事件。
Atlas Stream Processing No.2 并不能决定下一个操作。它构建了决策系统用于实时推理和行动的行为信号层。
图 6。为AI代理构建统一智能数据层
下一个最佳操作代理在MongoDB中的统一智能数据层之上运行。它通过变更流对行为信号 (1) 做出反应,从 session_state (2.a) 获取实时上下文,并将其与MongoDB中存储的业务和语义上下文相结合,例如用户个人资料、目录、促销、规则、嵌入和MongoDB Vector Search (2.b)。该代理实时计算并存储下一个最佳动作(3),然后电子商务应用程序通过变更流使用该操作,并在实时会话期间显示以保留客户 (4)。
通过在MongoDB中存储会话状态、行为信号和决策输出,您可以为代理提供一个统一的智能数据层。代理可以读取该会话的新行为背景和历史信号。然后,它将这些数据与配置文件、目录、促销和业务规则相结合,在同一操作平台上写入回 NBA。这样可以保持架构简单,减少数据移动,并使代理快速访问权限以安全且可扩展的方式实时采取行动所需的上下文。
数据模型方法
MongoDB为实时架构提供了灵活的文档模型。将具有不同模式的各种客户事件存储在单个集合中。随着业务逻辑的发展更新模式。
文档模型可处理复杂的结构,例如数组和子文档。使用这些结构构建紧凑、可查询的会话内存。这种设计为AI代理提供了即时上下文,无需连接数据库。使用生存时间 (TTL) 索引来消除手动数据清理脚本。
此解决方案使用四个主要集合:
events_ingest:存储原始的短期数据事件。它是流媒体管道的主节点 (primary node in the replica set)来源。
session_state:存储每个会话的实时操作上下文,并大约每 10 秒获取更新。
session_signals:存储可解释的行为信号。Atlas Stream Processing为每个检测到的信号生成一份文档。
next_best_actions:存储电子商务接口的最终操作合同。
原始事件引入 (events_ingest)
电子商务应用程序将原始事件流式传输到
events_ingest集合中。每个事件都遵循一个通用结构,其中包含时间戳、标签和元数据。事件字段标识事件类型,而元数据部分存储特定于事件的属性。这种多态设计使不同的事件类型能够共存于同一集合中,而无需实施严格的模式。使用短TTL索引来管理这些临时数据。{ "timestamp": "2026-01-05T14:55:12.321Z", "tags": { "sessionId": "1767624420027", "userId": "66fe219d625d93a100528224", "event": "search" }, "metadata": { // Event-specific fields } } 物化会话状态 (session_state)
会话状态文档代表每个会话的单一实时事实来源。Atlas Stream Processing第 1 号每 10 秒更新一次此文档。这将创建一个紧凑的上下文,作为 2 号Atlas Stream Processing的源,并作为活动会话中代理决策的实时短期上下文。
该文档将累积活动 (
sessionTotals) 与最近的微窗口活动 (last10s) 分开。追踪事件计数、最近搜索和购物车更新。使用加权交互模型计算项目偏好分数。该模型衡量兴趣强度和关注方向。{ "_id": { "$oid": "69c16dd2f2145b31c07b1beb" }, "firstSeen": { "$date": "2026-03-23T16:43:59.013Z" }, "lastEvent": { "event": "view-product", "ts": { "$date": "2026-03-23T16:49:29.849Z" } }, "lastSeen": { "$date": "2026-03-23T16:49:29.849Z" }, "sessionId": "d73477b8-3879-4749-a402-321a526cdc33", "userId": "671ff2451ec726b417352703", "last10s": { "intent": { "products": [ { "productId": "67192b4264d161905fbe8342", "searchCount": 0, "viewCount": 0, "addToCartCount": 1, "weight": 7, "focus": 0.4375 }, { "productId": "67192b4264d161905fbe8245", "searchCount": 0, "viewCount": 1, "addToCartCount": 0, "weight": 3, "focus": 0.1875 }, { "productId": "67192b4264d161905fbe82a1", "searchCount": 0, "viewCount": 1, "addToCartCount": 0, "weight": 3, "focus": 0.1875 }, { "productId": "67192b3f64d161905fbe77af", "searchCount": 0, "viewCount": 1, "addToCartCount": 0, "weight": 3, "focus": 0.1875 } ], "articleTypes": [ { "articleType": "SHOES", "searchCount": 0, "viewCount": 2, "addToCartCount": 1, "weight": 13, "focus": 0.8125 }, { "articleType": "CARGO_STRAP", "searchCount": 0, "viewCount": 1, "addToCartCount": 0, "weight": 3, "focus": 0.1875 } ], "subCategories": [ { "subCategory": "Shoes", "searchCount": 0, "viewCount": 2, "addToCartCount": 1, "weight": 13, "focus": 0.8125 }, { "subCategory": "Hardware", "searchCount": 0, "viewCount": 1, "addToCartCount": 0, "weight": 3, "focus": 0.1875 } ], "dimensionTotals": { "productWeightTotal": 16, "articleTypeWeightTotal": 16, "subCategoryWeightTotal": 16 } }, "lastEvent": { "event": "view-product", "ts": { "$date": "2026-03-23T16:49:29.849Z" } }, "window": { "start": { "$date": "2026-03-23T16:49:20.000Z" }, "end": { "$date": "2026-03-23T16:49:30.000Z" } } }, "sessionTotals": { "eventCounts": { "heartbeat": 30, "search": 2, "view-product": 21, "add-to-cart": 13, "exit-risk": 14 }, "windowCount": 30 }, "searchHistory": [ "shoes", "running shoes" ] } 可行的诊断 (session_signals)
Atlas Stream Processing 2 每 30 秒评估一次会话状态,识别行为模式并生成信号。对每个信号使用一个文档,而不是会话文档中的一个数组信号。这种仅插入方法可避免写入放大,并防止在零售流量高峰期间出现热文档。
该解决方案侧重于三个核心行为信号,这些信号代表大多数电子商务旅程中具有高影响力的保留时刻。
高意向:客户正在积极考虑购买并表现出强烈的意向信号(例如重复关注产品、购物车进展、集中兴趣)。这是通过保证、产品指导、可用性和交付清晰度来减少疑虑和加速转换的理想时机。
退出风险:客户很可能在没有转化的情况下离开,通常是在有意义的互动或购物车活动之后。这是保留会话、恢复购物车、保留意图或提供立即帮助的最后机会。
搜索摩擦:客户重复搜索和浏览但没有任何进展,这表明很难找到匹配项或感到沮丧。在客户因疲劳或沮丧而放弃会话之前,这是早期干预的最有价值的机会之一。
以下是搜索摩擦信号的示例文档:
{ "_id": "69c172667348bc2e9b60ee7b", "evidence": "Explored a considerable number of products in the last 30 seconds, while articleType focus remained predominant on 'PET_SUPPLIES'. The absence of add-to-cart suggests stable topic-level intent combined with choice overload at the product level", "severity": "medium", "sid": "d73477b8-3879-4749-a402-321a526cdc33", "signal": "search-friction", "topic": { "dimension": "articleType", "value": "PET_SUPPLIES" }, "ts": "2026-03-23T17:03:20.000Z", "uid": "671ff2451ec726b417352703" }
构建解决方案
要在您自己的环境中重现此演示,请按照以下步骤操作。
设置先决条件
在数据库中预加载产品。使用提供的转储文件恢复数据。
部署绿叶快闪店主应用程序。此应用程序包含前端和事件流系统。
Python 3.12 或更高版本
用于基岩访问权限的AWS凭证。
Voyage AI API key.
配置Atlas Stream Processing
创建 Stream Processing 工作区
使用以下设置在Atlas帐户中创建工作区:
设置值原因工作区名称
零售-retention-demo
明确的所有权和目的;便于以后识别。
层级
SP 10
足以满足演示规模的管道(约 2k 个会话),同时保持较低的成本。
提供商/区域
AWS / us-east-1 (N.弗吉尼亚州)
应与您的Atlas 集群区域相匹配,以减少延迟并避免跨区域流量。
最大层级大小(可选)
保留默认或设立为 SP30
允许在需要时快速扩展,而无需启用自动扩展。
注册数据库连接
打开 Stream Processing 工作区并配置以下连接设置:
连接类型Atlas 数据库connectionName
Retail_customer_retention
Atlas Cluster
选择包含 leafy_popup_store数据库的集群
执行为
读取和写入任何数据库
创建流处理器
在工作区中创建四个Atlas Stream Processor。对于第一个处理器,请执行以下步骤。对其余三个处理器重复该进程。
创建Atlas Stream Processing No. 1:会话状态构建器
打开您的 Stream Processing 工作区。
单击创建处理器。
输入 asp1_session_state_builder 作为处理器名称。
打开存储库中的asp1_session_state_builder_.js文件。
复制管道定义。
将管道粘贴到 Processor Definition 编辑器中。
单击创建处理器。
单击“开始”。
重复上一个步骤以创建这三个处理器:
processorName管道asp2_exit_riskasp2_high_intentasp2_search_friction
配置 NBA 决策层
git clone https://github.com/mongodb-industry-solutions/retail-customer-retention-backend/tree/staging cd retail-customer-retention-backend 创建并激活虚拟环境。
python3 -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate 安装依赖项。
pip install -r requirements.txt 在根目录中创建
.env文件并配置以下环境变量:MONGODB_URI= AWS_REGION= AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= VOYAGE_API_KEY= 启动应用程序。此脚本会启动 MCP服务器和变更流监控。
python main.py
测试解决方案
通过 /shop 打开应用程序。与目录交互。应用程序会在屏幕上显示“下一个最佳操作”。
关键要点
实时留住客户:借助Atlas Stream Processing,您可以进程点击流事件,以在活动会话期间触发下一步最佳操作。这使零售商能够从被动的损害控制转向主动的即时干预。
简化数据架构:借助MongoDB文档模型,您可以在单个平台中存储原始点击流事件、会话状态和行为信号。使用TTL索引和灵活集合,避免提取、转换和加载管道以及手动清理。
在MongoDB中的智能数据层之上构建代理式下一个最佳操作:将AI代理直接连接到MongoDB中的实时会话上下文和操作数据。利用 向量搜索 等AI功能丰富下一步最佳行动,提供量身定制的体验。
作者
Angie Guemes, MongoDB
Florencia Arin, MongoDB
Rodrigo Leal, MongoDB
Daniel Jamir, MongoDB