使用MongoDB Atlas和代理AI构建面向未来的开放金融生态系统,为同意旅程和数据共享提供支持,从而实现信用可移植性。
行业: 金融服务
产品和工具: MongoDB Vector Search、MongoDB MCP 服务器、MongoDB Queryable Encryption
合作伙伴: LangChain
解决方案概述
该解决方案展示了一个开放式金融生态系统,并演示了如何使用MongoDB Atlas和 Agentic AI在机构之间安全地股票金融数据。
了解如何实现代理AI框架,以简化同意审批并改进金融产品以实现信用可移植性。MongoDB Atlas是支撑这些开放金融架构的运营数据层。
参考架构
图 1。流程图
如图所示,当客户登录 Leafy Bank(我们虚构的金融机构)时,该进程就开始了。客户同意或拒绝访问权限外部数据 — 第三方提供商 (TPP) 数据或其他金融机构数据(在本演示中: MongoDB Bank、NeoFinance 和 Green Bank)。
多代理工作流接收客户请求并执行以下任务:
主管代理:读取对话并将每个请求路由给正确的专家。
同意代理:指导客户与外部银行达成安全数据共享同意。它处理机构选择、同意创建、银行登录、显式客户批准和撤销。
可转移代理:分析外部银行数据以生成支出分数、信用评估和显示潜在节省费用的确定性贷款可转移优惠。
Leafy 银行代理:通过 MCP服务器直接查询MongoDB ,回答有关客户的 Leafy 银行账户、交易和产品的即席问题。
核心功能
该演示展示了MongoDB Atlas和代理AI支持安全、智能开放式金融工作流程的四项核心功能。
用于同意和代理隐私的Queryable Encryption
以明文形式存储同意记录会将敏感字段暴露给数据库管理员、备份进程和潜在的违规行为。为了防止这些风险,开放金融法规要求机构在每个同意生命周期事件中保护消费者身份:创建、授权、数据检索和撤销。静态加密还可以保护AI代理配置(例如系统提示和工具定义),以限制专有逻辑的暴露。
MongoDB Queryable Encryption通过在驱动程序级别加密敏感字段来解决这个问题,确保服务器永远看不到明文。可以为相等查询配置需要过滤器的字段。驱动程序在发送查询值之前对其进行加密,因此服务器可以在不查看明文的情况下匹配密文。仅需要解密后读取的字段将保持加密,而不支持查询。
该演示在两个地方应用了Queryable Encryption :
同意集合(开放金融后端中的
encrypted_consents),具有四个加密字段:Consumer.UserNameConsumer.UserIdPermissionsSourceInstitution.InstitutionName
Consumer.UserName字段支持相等查询,因此服务可以列出客户的同意情况,而数据库无需看到明文用户名。代理配置文件集合(聊天机器人后端中的
encrypted_agent_profiles),包含三个加密字段:agent_name(equality-queryable)system_prompttool_config
代理提示是在运行时从加密的MongoDB加载的。Queryable Encryption为每个字段生成一个单独的数据加密密钥。它支持将 AWS Key Management Service (KMS)、 Azure Key Vault和 Google Cloud KMS作为密钥管理提供商。以下示例显示了开放式金融后端的加密连接设置:
from pymongo import MongoClient from pymongo.encryption_options import AutoEncryptionOpts class EncryptedMongoDBConnection(MongoDBConnection): """Subclasses the standard connection — services that type-hint MongoDBConnection accept it without modification.""" def __init__(self, uri: str, auto_encryption_opts: AutoEncryptionOpts): self.uri = uri self.client = MongoClient(self.uri, auto_encryption_opts=auto_encryption_opts)
同意查询的工作方式与明文相同 —驱动程序透明地处理加密和解密:
# Standard query on a plaintext field — works as usual consent = consents_collection.find_one({"ConsentId": consent_id}) # Equality query on an encrypted field — same syntax, driver encrypts the filter value consents = list(consents_collection.find({"Consumer.UserName": user_name}))
加密连接扩展了标准 MongoDBConnection,因此每个对基类进行类型提示的服务都会接受该连接而不进行修改。
使用MongoDB Atlas 进行智能事务分类和向量搜索
当消费者通过同意共享外部银行数据时,到达的ACID 事务记录没有标准化的商户类别。“Nobu Restaurant”或“UBER TRIP”等原始商户字符串 — 不包含类别元元数据。如果没有分类,支出分析和信用评分就无法发挥作用。
Atlas 向量搜索通过将商户说明与 49 商户类别代码 (MCC) 的参考集合进行匹配,对这些事务进行分类。该演示使用 Voyage AI 的 voyage-finance-2 嵌入模型(专为金融文本构建)为 MCC 参考数据和传入事务说明生成 1024 维度向量。
以下示例显示了向量搜索分类管道:
class MCCClassificationService: def __init__(self, connection, db_name, collection_name): self.collection = connection.get_collection(db_name, collection_name) self.vo = voyageai.Client() self.model = "voyage-finance-2" def classify_batch(self, transactions): # Build query text from merchant name + description query_texts = [ self._build_query_text(txn.get("merchant_name", ""), txn.get("description", "")) for txn in transactions ] # Single batch embedding call — input_type="query" for search queries embed_result = self.vo.embed(query_texts, model=self.model, input_type="query") # Vector search each embedding against MCC reference codes for txn, embedding in zip(transactions, embed_result.embeddings): match = self._vector_search(embedding) if match: txn.update({ "MCC": match["MCC"], "CategoryName": match["CategoryName"], "confidence": round(match["score"], 4), }) def _vector_search(self, query_embedding, num_candidates=20, limit=1): pipeline = [ {"$vectorSearch": { "index": "mcc_codes_vector_index", "path": "embedding", "queryVector": query_embedding, "numCandidates": num_candidates, "limit": limit, }}, {"$project": { "MCC": 1, "MCCDescription": 1, "CategoryId": 1, "CategoryName": 1, "score": {"$meta": "vectorSearchScore"}, "_id": 0, }}, ] results = list(self.collection.aggregate(pipeline)) return results[0] if results else None
MCC 参考文档在种子时使用 input_type="document" 嵌入,而ACID 事务查询使用 input_type="query",遵循 Voyage AI 针对检索工作负载的非对称嵌入最佳实践。
分类是短暂的,结果会返回给代理,但从不持久化。外部银行数据是通过同意借用的,而不是拥有的。可迁移代理使用分类事务来计算支出分数,将其与最佳实践基准进行比较,并生成确定性的贷款可迁移要约。
使用MongoDB MCP 服务器 进行代理数据访问
财务顾问和消费者经常需要有关帐户数据的临时答案:“我的总余额是多少?”,“显示我最近的 10 笔事务”或“我有资格获得哪些产品?”。为每个可能的查询构建自定义API终结点是不切实际的。
MongoDB MCP 服务器将MongoDB集合公开为 LLM 代理可以直接调用的工具。该演示在应用程序初创企业时将 MCP服务器作为子进程启动,以只读模式将其连接到 leafy_bank数据库,并通过持久会话将生成的工具传递给 LangGraph代理。
以下示例显示了 MCP服务器集成:
from langchain_mcp_adapters.client import MultiServerMCPClient from langchain_mcp_adapters.tools import load_mcp_tools mcp_client = MultiServerMCPClient({ "mongodb": { "command": "npx", "args": ["-y", "mongodb-mcp-server@latest"], "transport": "stdio", "env": { **os.environ, "MDB_MCP_CONNECTION_STRING": LEAFY_BANK_MONGODB_URI, "MDB_MCP_READ_ONLY": "true", "MDB_MCP_DISABLED_TOOLS": disabled_tools, }, } }) # Persistent session keeps MongoDB connection state across tool calls async with mcp_client.session("mongodb") as session: all_mcp_tools = await load_mcp_tools(session) # Pre-connect so the agent never handles connection strings connect_tool = next((t for t in all_mcp_tools if t.name == "connect"), None) if connect_tool: await connect_tool.ainvoke({"connectionString": LEAFY_BANK_MONGODB_URI}) # Only expose read/query tools to the agent allowed_tools = {"find", "aggregate", "count", "list-collections", "collection-schema"} mcp_tools = [t for t in all_mcp_tools if t.name in allowed_tools]
叶子银行代理接收这些过滤后的工具以及从 LangGraph 配置中读取经过身份验证的客户标识符的 get_current_user_id 工具。它通过自主生成MongoDB查询来回答自然语言问题 —代理可以执行以下操作:
find:用于查找aggregate:用于计算Collection-schema:用于发现。
每个集合不需要自定义工具代码。
使用 LangGraph 进行多代理编排
开放式金融工作流程跨越不同的领域 — 同意管理、财务分析和内部银行数据查询。处理所有这三个问题的单个整体代理需要大型工具集和涵盖冲突关注点的系统提示。分成专门的代理可以使每个工具集较小,并且每次提示都有针对性。
主管代理协调三名专家:
同意代理:管理数据共享流
Portability Agent:分析外部数据以获取贷款报价
Leafy Bank Agent:通过 MCP服务器查询 Leafy Bank 数据。
LangGraph 根据意图将每条客户消息路由到相应的专家, MongoDB Atlas通过检查点集合保留对话状态。
以下示例显示了具有结构化输出的监控程序路由:
from langgraph.graph import StateGraph, START, END class RouterDecision(BaseModel): next: Literal["consent_agent", "portability_agent", "internal_data_agent", "FINISH"] response: str = "" workflow = StateGraph(AgentState) workflow.add_node("supervisor", supervisor) workflow.add_node("consent_agent", consent_agent) workflow.add_node("portability_agent", portability_agent) workflow.add_node("internal_data_agent", internal_data_agent) workflow.add_edge(START, "supervisor") workflow.add_conditional_edges("supervisor", route_from_supervisor, { "consent_agent": "consent_agent", "portability_agent": "portability_agent", "internal_data_agent": "internal_data_agent", "FINISH": END, }) workflow.add_edge("consent_agent", "supervisor") workflow.add_edge("portability_agent", "supervisor") workflow.add_edge("internal_data_agent", "supervisor") graph = workflow.compile(checkpointer=MongoDBSaver(client=db.client, db_name=DATABASE_NAME))
受监管的工作流程(同意批准、KYC 审查、付款授权)需要人工检查点,在这些检查点,代理必须暂停并等待决策,然后才能继续。LangGraph 的 interrupt() 机制通过将完整图表状态序列化为MongoDB并将有效负载返回给调用者来处理此要求。当外部进程完成时,工作流将恢复:
from langgraph.types import interrupt, Command # Agent pauses, returns review payload to the calling application review = interrupt({ "type": "APPROVAL_REQUIRED", "details": approval_details, }) # Application resumes the workflow after the human decision await agent.ainvoke(Command(resume=decision), config)
MongoDB Atlas检查点集合会保留完整会话状态:
消息历史记录
主动同意
路由决策
工作流程可承受持续数秒(单击按钮)或数小时(通宵合规查看)的中断。每个子代理运行一个 ReAct 循环(原因 → 行动 → 观察),直到产生最终响应,然后将控制权返回给主管代理以进行下一个路由决策。
数据模型方法
该演示使用两个MongoDB Atlas数据库:
leafy_bank:存储机构自身的数据,例如客户数账户、事务历史记录、贷款产品以及推动可移植性产品的承保规则。它还保存可移植代理用于分类的 MCC 代码引用和支出基准。open_finance:存储通过同意获得的数据,例如来自合作机构的外部账户、贷款、事务和还款历史记录。同意本身存在于一个加密集合中。单独的机构集合注册可用的外部存储体。
以下是集合中文档的示例:
accounts(leafy_bank):{ "_id": { "$oid": "675488b874a6710be0583b3e" }, "AccountNumber": "514624177", "AccountBank": "LeafyBank", "AccountStatus": "Active", "AccountIdentificationType": "AccountNumber", "AccountDate": { "OpeningDate": { "$date": "2024-12-07T17:41:12.710Z" } }, "AccountType": "Savings", "AccountBalance": 3910, "AccountCurrency": "USD", "AccountDescription": "Savings account for fridaklo", "AccountUser": { "UserName": "fridaklo", "UserId": { "$oid": "65a546ae4a8f64e8f88fb89e" } } } encrypted_consents(open_finance){ "_id": { "$oid": "69b444e3090356f30066927f" }, "ConsentId": "urn:greenbank:Cf5b9ff59e06f77", "Status": "CONSUMED", "Consumer": { "UserName": <encrypted data>, "UserId": <encrypted data> }, "Permissions": <encrypted data>, "Purpose": "PERSONAL_LOAN_PORTABILITY", "SourceInstitution": { "InstitutionName": <encrypted data>, "InstitutionId": "679a1001a9711d00a3bb01a1" }, "CreationDateTime": { "$date": "2026-02-05T10:55:30.061Z" }, "ExpirationDateTime": { "$date": "2026-08-04T10:55:30.061Z" }, "StatusUpdateDateTime": { "$date": "2026-02-05T11:13:52.900Z" }, "StatusHistory": [ { "Status": "AWAITING_AUTHORISATION", "DateTime": { "$date": "2026-02-05T10:55:30.061Z" }, "Reason": "Consent created" }, { "Status": "AUTHORISED", "DateTime": { "$date": "2026-02-05T10:56:06.100Z" }, "Reason": "Status changed to AUTHORISED" }, { "Status": "CONSUMED", "DateTime": { "$date": "2026-02-05T11:13:52.900Z" }, "Reason": "Data retrieved successfully" } ], "__safeContent__": [ { "$binary": { "base64": "36UjEj4mfh1fKHren43cLiOy6HVs4/b/g+e6viTwQ9Q=", "subType": "00" } } ] }
请访问下一节中的 GitHub 存储库,探索解决方案中所有集合的示例数据。
构建解决方案
要构建此解决方案,实现两个协调服务:
要完整实现,请按照相应 Github 存储库中的说明进行操作。
1 部分:开放金融后端 (GitHub存储库)
第 2 部分:代理聊天机器人后端(GitHub存储库)
关键要点
在MongoDB Atlas上统一开放式金融数据:在MongoDB Atlas上统一内部和外部数据集作为操作数据层,以降低集成复杂性和重复。
通过聚合管道简化分析:使用MongoDB聚合管道在单个查询路径中计算内部和外部账户的余额、债务总额、可移植性节省和支出分数。
使用MongoDB可查询加密保护敏感同意数据:将Queryable Encryption应用于同意属性,以便您可以对敏感字段查询,同时为受监管的开放式金融工作负载保持强大的隐私控制。
利用代理AI简化同意流程:集成基于 LangGraph 的多代理聊天机器人,以自然语言解释同意范围、持续时间和目的,从而减少多银行流程中的放弃并改善客户体验。
使数据结构与 ISO 20022最佳实践保持一致:使用受 ISO 20022 启发的字段和代码对外部事务进行建模,以便跨机构标准化数据,而无需过度设计深层嵌套模式。
作者
Saul Calderon
Kiran Tulsulkar
Ainhoa Múgica
Andrea Alaman Calderon
丹尼尔·贾米尔