MongoDB Atlas 提供了多种功能来构建 AI 代理。作为向量和文档数据库,Atlas 支持代理型 RAG 的各种搜索方法,以及将代理交互存储在同一个数据库中以进行短期和长期代理记忆。
什么是 AI 代理?
在生成式人工智能的背景下,AI 代理通常指的是能够通过结合 AI 模型(如 LLM)与一组预定义工具,以自主或半自主方式完成任务的系统。
AI 代理可以使用工具收集上下文、与外部系统交互并执行操作。它们可以确定自己的执行流程(规划),并记住以前的交互情况,为响应提供依据(记忆)。因此,AI 代理最适合需要推理、规划和决策的复杂任务。
架构
AI 代理通常包含以下组件的组合:
感知 | 您对代理的输入。文本输入是 AI 代理最常见的感知机制,但输入也可以是音频、图像或多模态数据。 |
计划 | 代理如何确定下一步做什么。该组件通常包括 LLM 和提示,使用反馈循环和各种 提示工程技术,例如思维链和 reAct,帮助 LLM 推理复杂任务。 AI 代理可以由作为决策者的单个 LLM、具有多个提示的 LLM、多个协同工作的 LLM,或这些方法的任意组合组成。 |
工具 | 代理如何为任务收集上下文信息。工具允许代理与外部系统交互,并执行诸如向量搜索、网络搜索或从其他服务调用 API 等操作。 |
内存 | 一种用于存储代理交互的系统,使代理能够从过去的经验中学习,从而做出相应的响应。记忆可以是短期记忆(用于当前会话)或长期记忆(在会话之间持久化)。 |
注意
AI 代理在设计模式、功能和复杂性上各不相同。要学习其他代理架构,包括多代理系统,请参阅代理设计模式。
使用 MongoDB 构建 AI 代理
MongoDB Atlas 支持以下用于构建 AI 代理的组件:
代理工具
在 AI 代理的上下文中,工具是指可以由代理以编程方式定义和调用的任何东西。工具扩展了代理的功能,不仅限于生成文本,还使其能够与外部系统交互、检索信息并采取操作。工具通常通过特定接口定义,其中包括:
名称和说明,帮助代理了解何时使用工具。
所需参数及其预期格式。
在被调用时执行实际操作的函数。
代理利用其推理功能,根据用户的输入和当前任务,确定使用哪种工具、何时使用以及提供哪些参数。
除了标准 MongoDB 查询之外,Atlas 还提供了多种搜索功能,您可以将其作为代理工具来使用。
Atlas Vector Search:执行向量搜索,根据语义含义和相似性检索相关上下文。要了解更多信息,请参阅 Atlas Vector Search 概述。
Atlas Search:执行全文搜索,根据关键字匹配和相关性分数检索相关上下文。要了解更多信息,请参阅 Atlas Search 概述。
混合搜索:结合 Atlas Vector Search 和 Atlas Search,充分利用两种方法的优势。如要了解更多信息,请参阅如何执行混合搜索。
您可以手动定义工具,也可以使用 LangChain 和 LangGraph 等框架定义工具,这些框架为工具创建和调用提供了内置抽象。
工具被定义为智能体可调用的函数,用于执行特定任务。例如,以下语法说明了如何定义一个运行向量搜索查询的工具:
def vector_search_tool(query: str) -> str: pipeline = [ { "$vectorSearch": { # Vector search query pipeline... } } ] results = collection.aggregate(pipeline) array_of_results = [] for doc in results: array_of_results.append(doc) return array_of_results
工具调用是代理用于执行工具的方式。您可以在代理中定义如何处理工具调用,或者使用框架处理此问题。这些通常定义为 JSON 对象,其中包括工具名称和其他要传递给工具的参数,这样代理就可以使用适当的参数调用工具。例如,以下语法展示了代理如何调用 vector_search_tool:
{ "tool": "vector_search_tool", "args": { "query": "What is MongoDB?" }, "id": "call_H5TttXb423JfoulF1qVfPN3m" }
Agentic RAG
通过使用 Atlas 作为向量数据库,您可以创建实现代理型 RAG 的检索工具,这是一种 RAG 的高级形式,允许您通过 AI 代理动态协调检索和生成过程。
这种方法使得能够实现更复杂的工作流和用户交互。例如,您可以配置 AI 代理,根据任务确定最佳检索工具,例如使用 Atlas Vector Search 进行语义搜索和 Atlas Search 进行全文搜索。您还可以为不同的集合定义不同的检索工具,以进一步自定义代理的检索功能。
代理记忆
代理的记忆涉及存储先前交互的信息,以便代理可以从过去的经验中学习,并提供更相关和个性化的响应。这对于需要上下文的任务尤其重要,例如会话代理,其中代理需要记住会话中的先前回合,以提供连贯且上下文相关的响应。代理记忆主要有两种类型:
短期记忆:存储当前会话的信息,如最近的对话回合和当前任务上下文。
长期记忆:跨会话保存信息,其中可以包括过去的对话和一段时间内的个性化首选项。
由于 Atlas 也是一个文档数据库,因此可以通过在 MongoDB 集合中存储交互来实现代理记忆。然后,代理可以根据需要查询或更新此集合。有多种方法可以使用 MongoDB 实现代理记忆:
对于短期记忆,您可以在存储交互时包含一个
session_id字段,用于标识特定会话,然后查询具有相同 ID 的交互,将其作为上下文传递给代理。为了长期记忆,您可能会处理与 LLM 的多次交互,以提取相关信息,例如用户偏好或重要的上下文,然后将这些信息存储在一个单独的集合中,以便代理在需要时查询。
要构建强大的内存管理系统,启用更高效、更复杂地检索对话历史记录,请利用Atlas Search或Atlas Vector Search来索引和查询存储的交互。
存储短期记忆的集合中的文档可能类似于以下内容:
{ "session_id": "123", "user_id": "jane_doe", "interactions": [ { "role": "user", "content": "What is MongoDB?", "timestamp": "2025-01-01T12:00:00Z" }, { "role": "assistant", "content": "MongoDB is the world's leading modern database.", "timestamp": "2025-01-01T12:00:05Z" } ] }
存储长期记忆的集合中的文档可能类似于以下内容:
{ "user_id": "jane_doe", "last_updated": "2025-05-22T09:15:00Z", "preferences": { "conversation_tone": "casual", "custom_instructions": [ "I prefer concise answers." ], }, "facts": [ { "interests": ["AI", "MongoDB"], } ] }
以下框架也为代理记忆提供了与 MongoDB 的直接抽象:
框架 | 功能 |
|---|---|
LangChain |
要了解更多信息,请参阅教程。 |
LangGraph |
如需了解更多信息,请参阅 LangGraph 和 LangGraph.js。 |
开始体验
以下教程演示了如何使用 Atlas 构建 AI 代理,以实现代理 RAG 和内存功能,而无需使用代理框架。
使用本教程的可运行版本以作为 Python 笔记本。
先决条件
如要完成本教程,您必须具备以下条件:
一个 Atlas 帐户,而其集群运行着 MongoDB 版本 6.0.11、7.0.2 或更高版本(包括 RC)。确保您的 IP 地址包含在 Atlas 项目的访问列表中。如需了解详情,请参阅创建集群。
Voyage AI API 密钥。
一个 OpenAI API 密钥。
注意
本教程使用来自 Voyage AI 和 OpenAI 的模型,但您可以修改代码以使用您选择的模型。
步骤
此 AI 代理可用于回答有关自定义数据源的问题并执行计算。它还可以记住之前的交互,以便提供更准确的响应。它使用以下组件:
感知:文本输入。
规划:使用一个LLM和多个提示来推理完成任务。
工具:向量搜索工具和计算器工具。
Memory:将交互存储在 MongoDB 集合中。
设置环境。
初始化项目并安装依赖项。
创建一个新的项目目录,然后安装所需的依赖项:
mkdir mongodb-ai-agent cd mongodb-ai-agent pip install --quiet --upgrade pymongo voyageai openai langchain langchain_mongodb langchain_community 注意
您的项目将使用以下结构:
mongodb-ai-agent ├── config.py ├── ingest-data.py ├── tools.py ├── memory.py ├── planning.py ├── main.py 配置环境。
在项目中创建一个名为
config.py的文件。此文件将包含代理 API 密钥、Atlas 连接字符串,以及 MongoDB 数据库和集合名称。将占位符值替换为您的 Atlas 连接字符串以及您的 Voyage AI 和 OpenAI API 密钥。
from pymongo import MongoClient from openai import OpenAI import voyageai MONGODB_URI = "<connection-string>" VOYAGE_API_KEY = "<voyage-api-key>" OPENAI_API_KEY = "<openai-api-key>" # MongoDB Atlas configuration mongo_client = MongoClient(MONGODB_URI) agent_db = mongo_client["ai_agent_db"] vector_collection = agent_db["embeddings"] memory_collection = agent_db["chat_history"] # Model configuration voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY) client = OpenAI(api_key=OPENAI_API_KEY) VOYAGE_MODEL = "voyage-3-large" OPENAI_MODEL = "gpt-4o" 注意
连接字符串应使用以下格式:
mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net
将 Atlas 作为矢量数据库。
在项目中创建一个名为 ingest-data.py 的文件。该脚本使用 voyage-3-large 嵌入模型,将包含近期 MongoDB 收益报告的示例 PDF 文件导入到 Atlas 的一个集合。
要了解更多信息,请参阅摄取。
from config import vector_collection, voyage_client, VOYAGE_MODEL from pymongo.operations import SearchIndexModel from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter import time # Define a function to generate embeddings def get_embedding(data, input_type = "document"): embeddings = voyage_client.embed( data, model = VOYAGE_MODEL, input_type = input_type ).embeddings return embeddings[0] # --- Ingest embeddings into MongoDB Atlas --- def ingest_data(): # Chunk PDF data loader = PyPDFLoader("https://investors.mongodb.com/node/13176/pdf") data = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=20) documents = text_splitter.split_documents(data) print(f"Successfully split PDF into {len(documents)} chunks.") # Ingest chunked documents into collection print("Generating embeddings and ingesting documents...") docs_to_insert = [] for i, doc in enumerate(documents): embedding = get_embedding(doc.page_content) if embedding: docs_to_insert.append({ "text": doc.page_content, "embedding": embedding }) if docs_to_insert: result = vector_collection.insert_many(docs_to_insert) print(f"Inserted {len(result.inserted_ids)} documents into the collection.") else: print("No documents were inserted. Check embedding generation process.") # --- Create the vector search index --- index_name = "vector_index" search_index_model = SearchIndexModel( definition = { "fields": [ { "type": "vector", "numDimensions": 1024, "path": "embedding", "similarity": "cosine" } ] }, name=index_name, type="vectorSearch" ) try: vector_collection.create_search_index(model=search_index_model) print(f"Search index '{index_name}' creation initiated.") except Exception as e: print(f"Error creating search index: {e}") return # Wait for initial sync to complete print("Polling to check if the index is ready. This may take up to a minute.") predicate=None if predicate is None: predicate = lambda index: index.get("queryable") is True while True: indices = list(vector_collection.list_search_indexes(index_name)) if len(indices) and predicate(indices[0]): break time.sleep(5) print(index_name + " is ready for querying.")
为代理定义工具。
在项目中创建一个名为 tools.py 的文件。该文件定义了代理可以用来回答问题的工具。在此示例中,您定义以下工具:
vector_search_tool运行向量搜索查询,从集合中检索相关文档。calculator_tool:使用eval()函数进行基本数学运算。
from config import vector_collection from ingest_data import get_embedding # Define a vector search tool def vector_search_tool(user_input: str) -> str: query_embedding = get_embedding(user_input) pipeline = [ { "$vectorSearch": { "index": "vector_index", "queryVector": query_embedding, "path": "embedding", "exact": True, "limit": 5 } }, { "$project": { "_id": 0, "text": 1 } } ] results = vector_collection.aggregate(pipeline) array_of_results = [] for doc in results: array_of_results.append(doc) return array_of_results # Define a simple calculator tool def calculator_tool(user_input: str) -> str: try: result = eval(user_input) return str(result) except Exception as e: return f"Error: {str(e)}"
为代理添加内存。
在项目中创建一个名为 memory.py 的文件。此文件定义了代理用于存储其交互的系统。在此示例中,您通过定义以下函数来实现短期记忆:
store_chat_message:将交互信息存储在 MongoDB 集合中。retrieve_session_history:通过使用session_id字段来获取特定会话的所有交互。
from config import memory_collection from datetime import datetime from typing import List def store_chat_message(session_id: str, role: str, content: str) -> None: message = { "session_id": session_id, # Unique identifier for the chat session "role": role, # Role of the sender (user or system) "content": content, # Content of the message "timestamp": datetime.now(), # Timestamp of when the message was sent } memory_collection.insert_one(message) def retrieve_session_history(session_id: str) -> List: # Query the collection for messages with a specific "session_id" in ascending order cursor = memory_collection.find({"session_id": session_id}).sort("timestamp", 1) # Iterate through the cursor and return a JSON object with the message role and content if cursor: messages = [{"role": msg["role"], "content": msg["content"]} for msg in cursor] else: messages = [] return messages
定义代理的计划。
在项目中创建一个名为 planning.py 的文件。该文件将包括各种提示和 LLM 调用,以确定代理的执行流程。在此示例中,您定义以下函数:
tool_selector:决定 LLM 如何为任务选择合适的工具。generate_answer:通过使用工具、调用 LLM 和处理结果来编排代理执行流。get_llm_response:用于生成 LLM 响应的辅助函数。
from config import openai_client, OPENAI_MODEL from tools import vector_search_tool, calculator_tool from memory import store_chat_message, retrieve_session_history # Define a tool selector function that decides which tool to use based on user input and message history def tool_selector(user_input, session_history=None): messages = [ { "role": "system", "content": ( "Select the appropriate tool from the options below. Consider the full context of the conversation before deciding.\n\n" "Tools available:\n" "- vector_search_tool: Retrieve specific context about recent MongoDB earnings and announcements\n" "- calculator_tool: For mathematical operations\n" "- none: For general questions without additional context\n" "Process for making your decision:\n" "1. Analyze if the current question relates to or follows up on a previous vector search query\n" "2. For follow-up questions, incorporate context from previous exchanges to create a comprehensive search query\n" "3. Only use calculator_tool for explicit mathematical operations\n" "4. Default to none only when certain the other tools won't help\n\n" "When continuing a conversation:\n" "- Identify the specific topic being discussed\n" "- Include relevant details from previous exchanges\n" "- Formulate a query that stands alone but preserves conversation context\n\n" "Return a JSON object only: {\"tool\": \"selected_tool\", \"input\": \"your_query\"}" ) } ] if session_history: messages.extend(session_history) messages.append({"role": "user", "content": user_input}) response = openai_client.chat.completions.create( model=OPENAI_MODEL, messages=messages ).choices[0].message.content try: tool_call = eval(response) return tool_call.get("tool"), tool_call.get("input") except: return "none", user_input # Define the agent workflow def generate_response(session_id: str, user_input: str) -> str: # Store the user input in the chat history collection store_chat_message(session_id, "user", user_input) # Initialize a list of inputs to pass to the LLM llm_input = [] # Retrieve the session history for the current session and add it to the LLM input session_history = retrieve_session_history(session_id) llm_input.extend(session_history) # Append the user message in the correct format user_message = { "role": "user", "content": user_input } llm_input.append(user_message) # Call the tool_selector function to determine which tool to use tool, tool_input = tool_selector(user_input, session_history) print("Tool selected: ", tool) # Process based on selected tool if tool == "vector_search_tool": context = vector_search_tool(tool_input) # Construct the system prompt using the retrieved context and append it to the LLM input system_message_content = ( f"Answer the user's question based on the retrieved context and conversation history.\n" f"1. First, understand what specific information the user is requesting\n" f"2. Then, locate the most relevant details in the context provided\n" f"3. Finally, provide a clear, accurate response that directly addresses the question\n\n" f"If the current question builds on previous exchanges, maintain continuity in your answer.\n" f"Only state facts clearly supported by the provided context. If information is not available, say 'I DON'T KNOW'.\n\n" f"Context:\n{context}" ) response = get_llm_response(llm_input, system_message_content) elif tool == "calculator_tool": # Perform the calculation using the calculator tool response = calculator_tool(tool_input) else: system_message_content = "You are a helpful assistant. Respond to the user's prompt as best as you can based on the conversation history." response = get_llm_response(llm_input, system_message_content) # Store the system response in the chat history collection store_chat_message(session_id, "system", response) return response # Helper function to get the LLM response def get_llm_response(messages, system_message_content): # Add the system message to the messages list system_message = { "role": "system", "content": system_message_content, } # If the system message should go at the end (for context-based queries) if any(msg.get("role") == "system" for msg in messages): messages.append(system_message) else: # For general queries, put system message at beginning messages = [system_message] + messages # Get response from LLM response = openai_client.chat.completions.create( model=OPENAI_MODEL, messages=messages ).choices[0].message.content return response
测试该代理。
最后,在项目中创建一个名为 main.py 的文件。此文件运行代理并允许您与其交互。
from config import mongo_client from ingest_data import ingest_data from planning import generate_response if __name__ == "__main__": try: run_ingest = input("Ingest sample data? (y/n): ") if run_ingest.lower() == 'y': ingest_data() session_id = input("Enter a session ID: ") while True: user_query = input("\nEnter your query (or type 'quit' to exit): ") if user_query.lower() == 'quit': break if not user_query.strip(): print("Query cannot be empty. Please try again.") continue answer = generate_response(session_id, user_query) print("\nAnswer:") print(answer) finally: mongo_client.close()
保存项目,然后运行以下命令。当您运行代理时:
如果还没有,请指示代理引入示例数据。
请输入会话 ID,开始新会话或继续现有会话。
提出问题。代理会根据您的工具、上一个交互以及在规划阶段定义的提示生成响应。请参阅示例输出,获取示例交互:
python main.py
Ingest sample data? (y/n): y Successfully split PDF into 104 chunks. Generating embeddings and ingesting documents... Inserted 104 documents into the collection. Search index 'vector_index' creation initiated. Polling to check if the index is ready. This may take up to a minute. vector_index is ready for querying. Enter a session ID: 123 Enter your query (or type 'quit' to exit): What was MongoDB's latest acquisition? Tool selected: vector_search_tool Answer: MongoDB's latest acquisition was Voyage AI. Enter your query (or type 'quit' to exit): What do they do? Tool selected: vector_search_tool Answer: Voyage AI is a company that specializes in state-of-the-art embedding and reranking models designed to power next-generation AI applications. These technologies help organizations build more advanced and trustworthy AI capabilities. Enter your query (or type 'quit' to exit): What is 123+456? Tool selected: calculator_tool Answer: 579
提示
您可以通过导航到集群中的 Atlas 数据库并选择 ai_agent_db 或集合,查看 Atlas 用户界面中的 embeddings chat_history 嵌入和交互。
Tutorials
如需更多关于使用 MongoDB 构建 AI 代理的教程,请参阅下表: