MongoDB提供了多种用于构建AI代理的功能。作为向量和文档数据库, MongoDB支持代理 RAG 的各种搜索方法,以及将代理交互存储在同一数据库中以进行短期和长期代理内存。
什么是 AI 代理?
在生成式人工智能的背景下,AI 代理通常指的是能够通过结合 AI 模型(如 LLM)与一组预定义工具,以自主或半自主方式完成任务的系统。
AI 代理可以使用工具收集上下文、与外部系统交互并执行操作。它们可以确定自己的执行流程(规划),并记住以前的交互情况,为响应提供依据(记忆)。因此,AI 代理最适合需要推理、规划和决策的复杂任务。
架构
AI 代理通常包含以下组件的组合:
感知 | 您对代理的输入。文本输入是 AI 代理最常见的感知机制,但输入也可以是音频、图像或多模态数据。 |
计划 | 代理如何确定下一步做什么。该组件通常包括 LLM 和提示,使用反馈循环和各种 提示工程技术,例如思维链和 reAct,帮助 LLM 推理复杂任务。 AI 代理可以由作为决策者的单个 LLM、具有多个提示的 LLM、多个协同工作的 LLM,或这些方法的任意组合组成。 |
工具 | 代理如何为任务收集上下文信息。工具允许代理与外部系统交互,并执行诸如向量搜索、网络搜索或从其他服务调用 API 等操作。 |
内存 | 一种用于存储代理交互的系统,使代理能够从过去的经验中学习,从而做出相应的响应。记忆可以是短期记忆(用于当前会话)或长期记忆(在会话之间持久化)。 |
注意
AI 代理在设计模式、功能和复杂性上各不相同。要学习其他代理架构,包括多代理系统,请参阅代理设计模式。
使用 MongoDB 构建 AI 代理
MongoDB支持使用以下组件来构建AI助手:
代理工具
在 AI 代理的上下文中,工具是指可以由代理以编程方式定义和调用的任何东西。工具扩展了代理的功能,不仅限于生成文本,还使其能够与外部系统交互、检索信息并采取操作。工具通常通过特定接口定义,其中包括:
名称和说明,帮助代理了解何时使用工具。
所需参数及其预期格式。
在被调用时执行实际操作的函数。
代理利用其推理功能,根据用户的输入和当前任务,确定使用哪种工具、何时使用以及提供哪些参数。
除了标准MongoDB查询之外, MongoDB还提供多种搜索功能,您可以将这些功能实现为代理的工具。
MongoDB Vector Search:执行向量搜索,根据语义和相似性检索。要学习;了解更多信息,请参阅 MongoDB Vector Search 概述。
MongoDB Search:执行全文搜索,根据关键字匹配和相关性评分检索相关上下文。要学习;了解更多信息,请参阅MongoDB Search 概述。
混合搜索:将MongoDB Vector Search 与MongoDB Search 相结合,以充分利用两种方法的优势。要学习;了解详情,请参阅如何执行混合搜索。
您可以手动定义工具,也可以使用 LangChain 和 LangGraph 等框架定义工具,这些框架为工具创建和调用提供了内置抽象。
工具被定义为智能体可调用的函数,用于执行特定任务。例如,以下语法说明了如何定义一个运行向量搜索查询的工具:
async function vectorSearchTool(query) { const pipeline = [ { $vectorSearch: { // Vector search query pipeline... } } ]; const results = await collection.aggregate(pipeline).toArray(); return results; }
def vector_search_tool(query: str) -> str: pipeline = [ { "$vectorSearch": { # Vector search query pipeline... } } ] results = collection.aggregate(pipeline) array_of_results = [] for doc in results: array_of_results.append(doc) return array_of_results
工具调用是代理用来执行工具的调用。您可以定义如何在代理中进程工具调用,或使用框架来处理此问题。这些通常定义为JSON对象,其中包括工具名称和其他要传递给该工具的参数,以便代理可以使用适当的参数调用该工具。示例,以下语法说明了代理如何调用向量搜索工具:
{ "tool": "vector_search_tool", "args": { "query": "What is MongoDB?" }, "id": "call_H5TttXb423JfoulF1qVfPN3m" }
Agentic RAG
通过使用MongoDB作为向量数据库,您可以创建实现代理 RAG 的检索工具,代理 RAG 是RAG的高级形式,允许您通过AI代理动态编排检索和生成进程。
这种方法支持更复杂的工作流程和用户交互。示例,您可以配置AI代理,根据任务确定最佳检索工具,例如使用MongoDB Vector Search 进行语义搜索,使用MongoDB Search 进行全文搜索。您还可以为不同的集合定义不同的检索工具,以进一步自定义代理的检索功能。
代理记忆
代理的记忆涉及存储先前交互的信息,以便代理可以从过去的经验中学习,并提供更相关和个性化的响应。这对于需要上下文的任务尤其重要,例如会话代理,其中代理需要记住会话中的先前回合,以提供连贯且上下文相关的响应。代理记忆主要有两种类型:
短期记忆:存储当前会话的信息,如最近的对话回合和当前任务上下文。
长期记忆:跨会话保存信息,其中可以包括过去的对话和一段时间内的个性化首选项。
由于MongoDB也是一个文档数据库,因此您可以通过将代理的交互存储在MongoDB集合中来实现代理的内存。然后,代理可以根据需要查询或更新此集合。使用MongoDB实现代理内存有多种方法:
对于短期记忆,您可以在存储交互时包含
session_id
字段来标识特定会话,然后查询具有相同ID的交互以作为上下文传递给代理。为了长期记忆,您可能会处理与 LLM 的多次交互,以提取相关信息,例如用户偏好或重要的上下文,然后将这些信息存储在一个单独的集合中,以便代理在需要时查询。
要构建强大的内存管理系统,启用更高效、更复杂地检索对话历史记录,请利用MongoDB Search 或MongoDB Vector Search 来存储、索引和查询跨会话的重要交互。
存储短期记忆的集合中的文档可能类似于以下内容:
{ "session_id": "123", "user_id": "jane_doe", "interactions": [ { "role": "user", "content": "What is MongoDB?", "timestamp": "2025-01-01T12:00:00Z" }, { "role": "assistant", "content": "MongoDB is the world's leading modern database.", "timestamp": "2025-01-01T12:00:05Z" } ] }
存储长期记忆的集合中的文档可能类似于以下内容:
{ "user_id": "jane_doe", "last_updated": "2025-05-22T09:15:00Z", "preferences": { "conversation_tone": "casual", "custom_instructions": [ "I prefer concise answers." ], }, "facts": [ { "interests": ["AI", "MongoDB"], } ] }
以下框架也为代理记忆提供了与 MongoDB 的直接抽象:
框架 | 功能 |
---|---|
LangChain |
要了解更多信息,请参阅教程。 |
LangGraph |
如需了解更多信息,请参阅 LangGraph 和 LangGraph.js。 |
开始体验
以下教程演示了如何在没有代理框架的情况下,使用MongoDB提供代理 RAG 和内存构建AI代理。
➤ 使用选择语言下拉菜单设立本教程的语言。
使用本教程的可运行版本以作为 Python 笔记本。
先决条件
如要完成本教程,您必须具备以下条件:
以下MongoDB 集群类型之一:
一个 Atlas 集群,运行 MongoDB 6.0.11、7.0.2 或更高版本。请确保您的 IP 地址包含在 Atlas 项目的访问列表中。
使用Atlas CLI创建的本地Atlas部署。要学习;了解更多信息,请参阅创建本地Atlas部署。
安装了Search 和 Vector Search的MongoDB Community或 Enterprise集群。
Voyage AI API 密钥。
一个 OpenAI API 密钥。
注意
本教程使用来自 Voyage AI 和 OpenAI 的模型,但您可以修改代码以使用您选择的模型。
步骤
此 AI 代理可用于回答有关自定义数据源的问题并执行计算。它还可以记住之前的交互,以便提供更准确的响应。它使用以下组件:
感知:文本输入。
规划:使用一个LLM和多个提示来推理完成任务。
工具:向量搜索工具和计算器工具。
Memory:将交互存储在 MongoDB 集合中。
设置环境。
初始化项目并安装依赖项。
创建一个新的项目目录,然后安装所需的依赖项:
mkdir mongodb-ai-agent cd mongodb-ai-agent npm init -y npm install --quiet dotenv mongodb voyageai openai langchain @langchain/community @langchain/core mathjs pdf-parse 注意
您的项目将使用以下结构:
mongodb-ai-agent ├── .env ├── config.js ├── ingest-data.js ├── tools.js ├── memory.js ├── planning.js └── index.js 配置环境。
在项目中创建名为
.env
的环境文件。此文件将包含代理的API密钥、 MongoDB连接字符串以及MongoDB 数据库和集合名称。将占位符值替换为MongoDB连接字符串以及 Voyage AI和 OpenAI API密钥。
MONGODB_URI="<mongodb-connection-string>" VOYAGE_API_KEY="<voyage-api-key>" OPENAI_API_KEY= "<openai-api-key>" 注意
将
<connection-string>
替换为您的 Atlas 集群或本地部署的连接字符串。连接字符串应使用以下格式:
mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net 要学习;了解更多信息,请参阅通过驱动程序连接到集群。
配置代理。
在项目中创建一个名为 config.js
的文件。此文件将读取您的环境变量,并将应用程序连接到MongoDB 数据库和 OpenAI 等服务。
import dotenv from 'dotenv'; import { MongoClient } from 'mongodb'; import OpenAI from "openai"; // Load environment variables from .env file dotenv.config(); // MongoDB cluster configuration export const MONGODB_URI = process.env.MONGODB_URI; export const mongoClient = new MongoClient(MONGODB_URI); export const agentDb = mongoClient.db("ai_agent_db"); export const vectorCollection = agentDb.collection("embeddings"); export const memoryCollection = agentDb.collection("chat_history"); // Model Configuration export const OPENAI_MODEL = "gpt-4o"; export const VOYAGE_MODEL = "voyage-3-large"; export const VOYAGE_API_KEY = process.env.VOYAGE_API_KEY; // Initialize OpenAI Client export const openAIClient = new OpenAI({ apiKey: process.env.OPENAI_API_KEY,});
使用MongoDB作为向量数据库。
在项目中创建一个名为 ingest-data.js
的文件。此脚本使用 voyage-3-large
嵌入模型将包含最新MongoDB收益报告的示例PDF 提取到MongoDB中的集合中。此代码还包含一个函数,用于在数据上创建向量搜索索引(如果尚不存在)。
要了解更多信息,请参阅摄取。
import { PDFLoader } from "@langchain/community/document_loaders/fs/pdf"; import { RecursiveCharacterTextSplitter } from "langchain/text_splitter"; import { vectorCollection } from "./config.js"; import { VOYAGE_API_KEY, VOYAGE_MODEL } from "./config.js"; import { VoyageAIClient } from "voyageai"; import { MONGODB_URI } from "./config.js"; import * as fs from 'fs'; import fetch from 'node-fetch'; console.log("Connecting to MongoDB:", MONGODB_URI); const EMBEDDING_DIMENSIONS = 1024; // Use Voyage AI Client SDK to get embeddings export async function getEmbedding(data, input_type) { if (!VOYAGE_API_KEY) { throw new Error("VOYAGE_API_KEY is not set in environment variables."); } try { const client = new VoyageAIClient({ apiKey: VOYAGE_API_KEY }); const response = await client.embed({ input: [data], model: VOYAGE_MODEL, input_type: input_type // "document" or "query" }); if (response.data && response.data.length > 0) { return response.data[0].embedding; } throw new Error("No embedding data found from Voyage AI response."); } catch (error) { console.error("Error generating Voyage AI embedding:", error); return null; } } // Ingest data from a PDF, generate embeddings, and store in MongoDB export async function ingestData() { try { // download PDF const rawData = await fetch("https://investors.mongodb.com/node/13176/pdf"); const pdfBuffer = await rawData.arrayBuffer(); fs.writeFileSync("investor-report.pdf", Buffer.from(pdfBuffer)); // load and split PDF const loader = new PDFLoader("investor-report.pdf"); const data = await loader.load(); const textSplitter = new RecursiveCharacterTextSplitter({ chunkSize: 400, chunkOverlap: 20, }); const docs = await textSplitter.splitDocuments(data); console.log(`Chunked PDF into ${docs.length} documents.`); // generate embeddings and insert const insertDocuments = await Promise.all(docs.map(async doc => ({ document: doc, embedding: await getEmbedding(doc.pageContent, "document"), }))); const result = await vectorCollection.insertMany(insertDocuments, { ordered: false }); console.log("Inserted documents:", result.insertedCount); } catch (err) { console.error("Ingestion error:", err); } } // Create a vector search index export async function createVectorIndex() { try { // check if the index already exists const existingIndexes = await vectorCollection.listSearchIndexes().toArray(); if (existingIndexes.some(index => index.name === "vector_index")) { console.log("Vector index already exists. Skipping creation."); return; } // define your Vector Search index const index = { name: "vector_index", type: "vectorSearch", definition: { "fields": [ { "type": "vector", "path": "embedding", "numDimensions": EMBEDDING_DIMENSIONS, "similarity": "cosine", } ] } } // run the helper method to ensure the index is created const result = await vectorCollection.createSearchIndex(index); console.log(`New index named ${result} is building.`); // wait for the index to be ready to query console.log("Polling to check if the index is ready. This may take up to a minute.") let isQueryable = false; while (!isQueryable) { const cursor = vectorCollection.listSearchIndexes(); for await (const index of cursor) { if (index.name === result) { if (index.queryable) { console.log(`${result} is ready for querying.`); isQueryable = true; } else { await new Promise(resolve => setTimeout(resolve, 5000)); } } } } } catch (err) { console.error("Error creating vector index:", err); throw err; } }
为代理定义工具。
在项目中创建一个名为 tools.js
的文件。该文件定义了代理可以用来回答问题的工具。在此示例中,您定义以下工具:
vectorSearchTool
运行向量搜索查询,从集合中检索相关文档。calculatorTool
:使用mathjs
库进行基本数学运算。
import { getEmbedding } from './ingest-data.js'; import { vectorCollection } from './config.js'; import { evaluate } from 'mathjs'; // Vector search tool export async function vectorSearchTool(userInput) { const queryEmbedding = await getEmbedding(userInput, "query"); const pipeline = [ { $vectorSearch: { index: "vector_index", queryVector: queryEmbedding, path: "embedding", exact: true, limit: 5 } }, { $project: { _id: 0, "document.pageContent": 1 } } ]; const cursor = vectorCollection.aggregate(pipeline); const results = await cursor.toArray(); return results; } // Simple calculator tool export function calculatorTool(userInput) { try { const result = evaluate(userInput); return String(result); } catch (e) { return `Error: ${e.message}`; } }
为代理添加内存。
在项目中创建一个名为 memory.js
的文件。此文件定义了代理用于存储其交互的系统。在此示例中,您通过定义以下函数来实现短期记忆:
storeChatMessage
:将交互信息存储在 MongoDB 集合中。retrieveSessionHistory
:通过使用session_id
字段来获取特定会话的所有交互。
import { memoryCollection } from './config.js'; /** * Store a chat message in the memory collection. * @param {string} sessionId - unique identifier for the chat session * @param {string} role - role of the sender (user or system) * @param {string} content - content of the message */ export async function storeChatMessage(sessionId, role, content) { const message = { session_id: sessionId, role, content, timestamp: new Date(), // use JS date for timestamp }; await memoryCollection.insertOne(message); } /** * Retrieve the chat history for a session. * @param {string} sessionId - unique identifier for the chat session * @returns {Promise<Array<{role: string, content: string}>>} */ export async function retrieveSessionHistory(sessionId) { const cursor = memoryCollection .find({ session_id: sessionId }) .sort({ timestamp: 1 }); const messages = []; await cursor.forEach(msg => { messages.push({ role: msg.role, content: msg.content }); }); return messages; }
定义代理的计划。
在项目中创建一个名为 planning.js
的文件。该文件将包括各种提示和 LLM 调用,以确定代理的执行流程。在此示例中,您定义以下函数:
openAIChatCompletion
:调用 OpenAI API来生成响应的辅助函数。toolSelector
:决定 LLM 如何为任务选择合适的工具。generateAnswer
:通过使用工具、调用 LLM 和处理结果来编排代理执行流。getLLMResponse
:用于生成 LLM 响应的辅助函数。
import { vectorSearchTool, calculatorTool } from './tools.js'; import { storeChatMessage, retrieveSessionHistory } from './memory.js'; import { openAIClient, OPENAI_MODEL } from './config.js'; // OpenAI chat completion helper export async function openAIChatCompletion(messages) { try { const completion = await openAIClient.chat.completions.create({ model: OPENAI_MODEL, messages, max_tokens: 1024, }); return completion.choices[0].message.content; } catch (error) { console.error("Error in openAIChatCompletion:", error); throw error; } } // Tool selector function to determine which tool to use based on user input and session history export async function toolSelector(userInput, sessionHistory = []) { const systemPrompt = ` Select the appropriate tool from the options below. Consider the full context of the conversation before deciding. Tools available: - vector_search_tool: Retrieve specific context about recent MongoDB earnings and announcements - calculator_tool: For mathematical operations - none: For general questions without additional context Process for making your decision: 1. Analyze if the current question relates to or follows up on a previous vector search query 2. For follow-up questions, incorporate context from previous exchanges to create a comprehensive search query 3. Only use calculator_tool for explicit mathematical operations 4. Default to none only when certain the other tools won't help When continuing a conversation: - Identify the specific topic being discussed - Include relevant details from previous exchanges - Formulate a query that stands alone but preserves conversation context Return a JSON object only: {"tool": "selected_tool", "input": "your_query"} `.trim(); const messages = [ { role: "system", content: systemPrompt }, ...sessionHistory, { role: "user", content: userInput } ]; try { const response = await openAIChatCompletion(messages); let toolCall; try { toolCall = JSON.parse(response); } catch { try { toolCall = eval(`(${response})`); } catch { return { tool: "none", input: userInput }; } } return { tool: toolCall.tool || "none", input: toolCall.input || userInput }; } catch (err) { console.error("Error in toolSelector:", err); return { tool: "none", input: userInput }; } } // Function to get LLM response based on messages and system message content async function getLlmResponse(messages, systemMessageContent) { const systemMessage = { role: "system", content: systemMessageContent }; let fullMessages; if (messages.some(msg => msg.role === "system")) { fullMessages = [...messages, systemMessage]; } else { fullMessages = [systemMessage, ...messages]; } const response = await openAIChatCompletion(fullMessages); return response; } // Function to generate response based on user input export async function generateResponse(sessionId, userInput) { await storeChatMessage(sessionId, "user", userInput); const sessionHistory = await retrieveSessionHistory(sessionId); const llmInput = [...sessionHistory, { role: "user", content: userInput }]; const { tool, input: toolInput } = await toolSelector(userInput, sessionHistory); console.log("Tool selected:", tool); let response; if (tool === "vector_search_tool") { const contextResults = await vectorSearchTool(toolInput); const context = contextResults.map(doc => doc.document?.pageContent || JSON.stringify(doc)).join('\n---\n'); const systemMessageContent = ` Answer the user's question based on the retrieved context and conversation history. 1. First, understand what specific information the user is requesting 2. Then, locate the most relevant details in the context provided 3. Finally, provide a clear, accurate response that directly addresses the question If the current question builds on previous exchanges, maintain continuity in your answer. Only state facts clearly supported by the provided context. If information is not available, say 'I DON'T KNOW'. Context: ${context} `.trim(); response = await getLlmResponse(llmInput, systemMessageContent); } else if (tool === "calculator_tool") { response = calculatorTool(toolInput); } else { const systemMessageContent = "You are a helpful assistant. Respond to the user's prompt as best as you can based on the conversation history."; response = await getLlmResponse(llmInput, systemMessageContent); } await storeChatMessage(sessionId, "system", response); return response; }
测试该代理。
最后,在项目中创建一个名为 index.js
的文件。此文件运行代理并允许您与其交互。
import readline from 'readline'; import { mongoClient } from './config.js'; import { ingestData, createVectorIndex } from './ingest-data.js'; import { generateResponse } from './planning.js'; // Prompt for user input async function prompt(question) { const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); return new Promise(resolve => rl.question(question, answer => { rl.close(); resolve(answer); })); } async function main() { try { await mongoClient.connect(); const runIngest = await prompt("Ingest sample data? (y/n): "); if (runIngest.trim().toLowerCase() === 'y') { await ingestData(); console.log("\nAttempting to create/verify Vector Search Index..."); await createVectorIndex(); } else { await createVectorIndex(); // ensure index exists even if not ingesting data } const sessionId = await prompt("Enter a session ID: "); while (true) { const userQuery = await prompt("\nEnter your query (or type 'quit' to exit): "); if (userQuery.trim().toLowerCase() === 'quit') break; if (!userQuery.trim()) { console.log("Query cannot be empty. Please try again."); continue; } const answer = await generateResponse(sessionId, userQuery); console.log("\nAnswer:"); console.log(answer); } } finally { await mongoClient.close(); } } main();
保存项目,然后运行以下命令。当您运行代理时:
如果还没有,请指示代理引入示例数据。
请输入会话 ID,开始新会话或继续现有会话。
提出问题。代理会根据您的工具、上一个交互以及在规划阶段定义的提示生成响应。
请参阅示例输出,获取示例交互:
node index.js
Ingest sample data? (y/n): y Chunked PDF into 100 documents. Inserted documents: 100 Attempting to create/verify Vector Search Index... New index named vector_index is building. Polling to check if the index is ready. This may take up to a minute. vector_index is ready for querying. Enter a session ID: 123 Enter your query (or type 'quit' to exit): What was MongoDB's latest acquisition? Tool selected: vector_search_tool Answer: MongoDB recently acquired Voyage AI, a pioneer in embedding and reranking models that power next-generation AI applications. Enter your query (or type 'quit' to exit): What do they do? Tool selected: vector_search_tool Answer: Voyage AI is a company that specializes in state-of-the-art embedding and reranking models designed to power next-generation AI applications. These technologies help organizations build more advanced and trustworthy AI capabilities. Enter your query (or type 'quit' to exit): What is 123+456? Tool selected: calculator_tool Answer: 579
提示
如果您使用的是Atlas ,则可以导航到Atlas用户用户界面中的 ai_agent_db.embeddings
命名空间以验证嵌入和交互。
继续构建。
现在您已经有了一个基本的 AI 代理,可以通过以下方式继续开发:
通过使用更高级的提示和 LLM 调用来完善规划阶段。
使用MongoDB Search 和MongoDB Vector Search 来存储和检索跨会话的重要交互,从而实现长期记忆和更高级的记忆系统。
设置环境。
初始化项目并安装依赖项。
创建一个新的项目目录,然后安装所需的依赖项:
mkdir mongodb-ai-agent cd mongodb-ai-agent pip install --quiet --upgrade pymongo voyageai openai langchain langchain-mongodb langchain-community python-dotenv 注意
您的项目将使用以下结构:
mongodb-ai-agent ├── .env ├── config.py ├── ingest_data.py ├── tools.py ├── memory.py ├── planning.py ├── main.py 配置环境。
在项目中创建名为
.env
的环境文件。此文件将包含代理的API密钥、 MongoDB连接字符串以及MongoDB 数据库和集合名称。将占位符值替换为MongoDB连接字符串以及 Voyage AI和 OpenAI API密钥。
MONGODB_URI="<mongodb-connection-string>" VOYAGE_API_KEY="<voyage-api-key>" OPENAI_API_KEY= "<openai-api-key>" 注意
将
<connection-string>
替换为您的 Atlas 集群或本地部署的连接字符串。连接字符串应使用以下格式:
mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net 要学习;了解更多信息,请参阅通过驱动程序连接到集群。
配置代理。
在项目中创建一个名为 config.py
的文件。此文件将读取您的环境变量,并将应用程序连接到MongoDB 数据库和 OpenAI 等服务。
from pymongo import MongoClient from openai import OpenAI import voyageai from dotenv import load_dotenv import os # Load environment variables from .env file load_dotenv() # Environment variables (private) MONGODB_URI = os.getenv("MONGODB_URI") VOYAGE_API_KEY = os.getenv("VOYAGE_API_KEY") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # MongoDB cluster configuration mongo_client = MongoClient(MONGODB_URI) agent_db = mongo_client["ai_agent_db"] vector_collection = agent_db["embeddings"] memory_collection = agent_db["chat_history"] # Model configuration voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY) client = OpenAI(api_key=OPENAI_API_KEY) VOYAGE_MODEL = "voyage-3-large" OPENAI_MODEL = "gpt-4o"
使用MongoDB作为向量数据库。
在项目中创建一个名为 ingest_data.py
的文件。此脚本使用 voyage-3-large
嵌入模型将包含最新MongoDB收益报告的示例PDF 提取到MongoDB中的集合中。此代码还包含一个函数,用于在数据上创建向量搜索索引(如果尚不存在)。
要了解更多信息,请参阅摄取。
from config import vector_collection, voyage_client, VOYAGE_MODEL from pymongo.operations import SearchIndexModel from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter import time # Define a function to generate embeddings def get_embedding(data, input_type = "document"): embeddings = voyage_client.embed( data, model = VOYAGE_MODEL, input_type = input_type ).embeddings return embeddings[0] # --- Ingest embeddings into MongoDB --- def ingest_data(): # Chunk PDF data loader = PyPDFLoader("https://investors.mongodb.com/node/13176/pdf") data = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=20) documents = text_splitter.split_documents(data) print(f"Successfully split PDF into {len(documents)} chunks.") # Ingest chunked documents into collection print("Generating embeddings and ingesting documents...") docs_to_insert = [] for i, doc in enumerate(documents): embedding = get_embedding(doc.page_content) if embedding: docs_to_insert.append({ "text": doc.page_content, "embedding": embedding }) if docs_to_insert: result = vector_collection.insert_many(docs_to_insert) print(f"Inserted {len(result.inserted_ids)} documents into the collection.") else: print("No documents were inserted. Check embedding generation process.") # --- Create the vector search index --- index_name = "vector_index" search_index_model = SearchIndexModel( definition = { "fields": [ { "type": "vector", "numDimensions": 1024, "path": "embedding", "similarity": "cosine" } ] }, name=index_name, type="vectorSearch" ) try: vector_collection.create_search_index(model=search_index_model) print(f"Search index '{index_name}' creation initiated.") except Exception as e: print(f"Error creating search index: {e}") return # Wait for initial sync to complete print("Polling to check if the index is ready. This may take up to a minute.") predicate=None if predicate is None: predicate = lambda index: index.get("queryable") is True while True: indices = list(vector_collection.list_search_indexes(index_name)) if len(indices) and predicate(indices[0]): break time.sleep(5) print(index_name + " is ready for querying.")
为代理定义工具。
在项目中创建一个名为 tools.py
的文件。该文件定义了代理可以用来回答问题的工具。在此示例中,您定义以下工具:
vector_search_tool
运行向量搜索查询,从集合中检索相关文档。calculator_tool
:使用eval()
函数进行基本数学运算。
from config import vector_collection from ingest_data import get_embedding # Define a vector search tool def vector_search_tool(user_input: str) -> str: query_embedding = get_embedding(user_input) pipeline = [ { "$vectorSearch": { "index": "vector_index", "queryVector": query_embedding, "path": "embedding", "exact": True, "limit": 5 } }, { "$project": { "_id": 0, "text": 1 } } ] results = vector_collection.aggregate(pipeline) array_of_results = [] for doc in results: array_of_results.append(doc) return array_of_results # Define a simple calculator tool def calculator_tool(user_input: str) -> str: try: result = eval(user_input) return str(result) except Exception as e: return f"Error: {str(e)}"
为代理添加内存。
在项目中创建一个名为 memory.py
的文件。此文件定义了代理用于存储其交互的系统。在此示例中,您通过定义以下函数来实现短期记忆:
store_chat_message
:将交互信息存储在 MongoDB 集合中。retrieve_session_history
:通过使用session_id
字段来获取特定会话的所有交互。
from config import memory_collection from datetime import datetime from typing import List def store_chat_message(session_id: str, role: str, content: str) -> None: message = { "session_id": session_id, # Unique identifier for the chat session "role": role, # Role of the sender (user or system) "content": content, # Content of the message "timestamp": datetime.now(), # Timestamp of when the message was sent } memory_collection.insert_one(message) def retrieve_session_history(session_id: str) -> List: # Query the collection for messages with a specific "session_id" in ascending order cursor = memory_collection.find({"session_id": session_id}).sort("timestamp", 1) # Iterate through the cursor and return a JSON object with the message role and content if cursor: messages = [{"role": msg["role"], "content": msg["content"]} for msg in cursor] else: messages = [] return messages
定义代理的计划。
在项目中创建一个名为 planning.py
的文件。该文件将包括各种提示和 LLM 调用,以确定代理的执行流程。在此示例中,您定义以下函数:
tool_selector
:决定 LLM 如何为任务选择合适的工具。generate_answer
:通过使用工具、调用 LLM 和处理结果来编排代理执行流。get_llm_response
:用于生成 LLM 响应的辅助函数。
from config import openai_client, OPENAI_MODEL from tools import vector_search_tool, calculator_tool from memory import store_chat_message, retrieve_session_history # Define a tool selector function that decides which tool to use based on user input and message history def tool_selector(user_input, session_history=None): messages = [ { "role": "system", "content": ( "Select the appropriate tool from the options below. Consider the full context of the conversation before deciding.\n\n" "Tools available:\n" "- vector_search_tool: Retrieve specific context about recent MongoDB earnings and announcements\n" "- calculator_tool: For mathematical operations\n" "- none: For general questions without additional context\n" "Process for making your decision:\n" "1. Analyze if the current question relates to or follows up on a previous vector search query\n" "2. For follow-up questions, incorporate context from previous exchanges to create a comprehensive search query\n" "3. Only use calculator_tool for explicit mathematical operations\n" "4. Default to none only when certain the other tools won't help\n\n" "When continuing a conversation:\n" "- Identify the specific topic being discussed\n" "- Include relevant details from previous exchanges\n" "- Formulate a query that stands alone but preserves conversation context\n\n" "Return a JSON object only: {\"tool\": \"selected_tool\", \"input\": \"your_query\"}" ) } ] if session_history: messages.extend(session_history) messages.append({"role": "user", "content": user_input}) response = openai_client.chat.completions.create( model=OPENAI_MODEL, messages=messages ).choices[0].message.content try: tool_call = eval(response) return tool_call.get("tool"), tool_call.get("input") except: return "none", user_input # Define the agent workflow def generate_response(session_id: str, user_input: str) -> str: # Store the user input in the chat history collection store_chat_message(session_id, "user", user_input) # Initialize a list of inputs to pass to the LLM llm_input = [] # Retrieve the session history for the current session and add it to the LLM input session_history = retrieve_session_history(session_id) llm_input.extend(session_history) # Append the user message in the correct format user_message = { "role": "user", "content": user_input } llm_input.append(user_message) # Call the tool_selector function to determine which tool to use tool, tool_input = tool_selector(user_input, session_history) print("Tool selected: ", tool) # Process based on selected tool if tool == "vector_search_tool": context = vector_search_tool(tool_input) # Construct the system prompt using the retrieved context and append it to the LLM input system_message_content = ( f"Answer the user's question based on the retrieved context and conversation history.\n" f"1. First, understand what specific information the user is requesting\n" f"2. Then, locate the most relevant details in the context provided\n" f"3. Finally, provide a clear, accurate response that directly addresses the question\n\n" f"If the current question builds on previous exchanges, maintain continuity in your answer.\n" f"Only state facts clearly supported by the provided context. If information is not available, say 'I DON'T KNOW'.\n\n" f"Context:\n{context}" ) response = get_llm_response(llm_input, system_message_content) elif tool == "calculator_tool": # Perform the calculation using the calculator tool response = calculator_tool(tool_input) else: system_message_content = "You are a helpful assistant. Respond to the user's prompt as best as you can based on the conversation history." response = get_llm_response(llm_input, system_message_content) # Store the system response in the chat history collection store_chat_message(session_id, "system", response) return response # Helper function to get the LLM response def get_llm_response(messages, system_message_content): # Add the system message to the messages list system_message = { "role": "system", "content": system_message_content, } # If the system message should go at the end (for context-based queries) if any(msg.get("role") == "system" for msg in messages): messages.append(system_message) else: # For general queries, put system message at beginning messages = [system_message] + messages # Get response from LLM response = openai_client.chat.completions.create( model=OPENAI_MODEL, messages=messages ).choices[0].message.content return response
测试该代理。
最后,在项目中创建一个名为 main.py
的文件。此文件运行代理并允许您与其交互。
from config import mongo_client from ingest_data import ingest_data from planning import generate_response if __name__ == "__main__": try: run_ingest = input("Ingest sample data? (y/n): ") if run_ingest.lower() == 'y': ingest_data() session_id = input("Enter a session ID: ") while True: user_query = input("\nEnter your query (or type 'quit' to exit): ") if user_query.lower() == 'quit': break if not user_query.strip(): print("Query cannot be empty. Please try again.") continue answer = generate_response(session_id, user_query) print("\nAnswer:") print(answer) finally: mongo_client.close()
保存项目,然后运行以下命令。当您运行代理时:
如果还没有,请指示代理引入示例数据。
请输入会话 ID,开始新会话或继续现有会话。
提出问题。代理会根据您的工具、上一个交互以及在规划阶段定义的提示生成响应。
请参阅示例输出,获取示例交互:
python main.py
Ingest sample data? (y/n): y Successfully split PDF into 104 chunks. Generating embeddings and ingesting documents... Inserted 104 documents into the collection. Search index 'vector_index' creation initiated. Polling to check if the index is ready. This may take up to a minute. vector_index is ready for querying. Enter a session ID: 123 Enter your query (or type 'quit' to exit): What was MongoDB's latest acquisition? Tool selected: vector_search_tool Answer: MongoDB's latest acquisition was Voyage AI. Enter your query (or type 'quit' to exit): What do they do? Tool selected: vector_search_tool Answer: Voyage AI is a company that specializes in state-of-the-art embedding and reranking models designed to power next-generation AI applications. These technologies help organizations build more advanced and trustworthy AI capabilities. Enter your query (or type 'quit' to exit): What is 123+456? Tool selected: calculator_tool Answer: 579
提示
如果您使用的是Atlas ,则可以导航到Atlas用户用户界面中的 ai_agent_db.embeddings
命名空间以验证嵌入和交互。
继续构建。
现在您已经有了一个基本的 AI 代理,可以通过以下方式继续开发:
通过使用更高级的提示和 LLM 调用来完善规划阶段。
使用MongoDB Search 和MongoDB Vector Search 来存储和检索跨会话的重要交互,从而实现长期记忆和更高级的记忆系统。
Tutorials
如需更多关于使用 MongoDB 构建 AI 代理的教程,请参阅下表: