对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

使用 MongoDB 构建 AI 代理

MongoDB提供了多种用于构建AI代理的功能。作为向量和文档数据库, MongoDB支持代理 RAG 的各种搜索方法,以及将代理交互存储在同一数据库中以进行短期和长期代理内存。

开始体验

在生成式人工智能的背景下,AI 代理通常指的是能够通过结合 AI 模型(如 LLM)与一组预定义工具,以自主或半自主方式完成任务的系统。

AI 代理可以使用工具收集上下文、与外部系统交互并执行操作。它们可以确定自己的执行流程(规划),并记住以前的交互情况,为响应提供依据(记忆)。因此,AI 代理最适合需要推理、规划和决策的复杂任务。

显示MongoDB的单一代理架构的图表

AI 代理通常包含以下组件的组合:

感知

您对代理的输入。文本输入是 AI 代理最常见的感知机制,但输入也可以是音频、图像或多模态数据。

计划

代理如何确定下一步做什么。该组件通常包括 LLM 和提示,使用反馈循环和各种 提示工程技术,例如思维链和 reAct,帮助 LLM 推理复杂任务。

AI 代理可以由作为决策者的单个 LLM、具有多个提示的 LLM、多个协同工作的 LLM,或这些方法的任意组合组成。

工具

代理如何为任务收集上下文信息。工具允许代理与外部系统交互,并执行诸如向量搜索、网络搜索或从其他服务调用 API 等操作。

内存

一种用于存储代理交互的系统,使代理能够从过去的经验中学习,从而做出相应的响应。记忆可以是短期记忆(用于当前会话)或长期记忆(在会话之间持久化)。

注意

AI 代理在设计模式、功能和复杂性上各不相同。要学习其他代理架构,包括多代理系统,请参阅代理设计模式。

MongoDB支持使用以下组件来构建AI助手:

  • 工具:利用 MongoDB 的搜索功能作为工具,帮助代理检索相关信息并实现代理型 RAG。

  • 存储器:将代理交互数据存储在 MongoDB 集合中,以满足短期和长期记忆需求。

在 AI 代理的上下文中,工具是指可以由代理以编程方式定义和调用的任何东西。工具扩展了代理的功能,不仅限于生成文本,还使其能够与外部系统交互、检索信息并采取操作。工具通常通过特定接口定义,其中包括:

  • 名称和说明,帮助代理了解何时使用工具。

  • 所需参数及其预期格式。

  • 在被调用时执行实际操作的函数。

代理利用其推理功能,根据用户的输入和当前任务,确定使用哪种工具、何时使用以及提供哪些参数。

除了标准MongoDB查询之外, MongoDB还提供多种搜索功能,您可以将这些功能实现为代理的工具。

  • MongoDB Vector Search:执行向量搜索,根据语义和相似性检索。要学习;了解更多信息,请参阅MongoDB Vector Search 概述。

  • MongoDB Search:执行全文搜索,根据关键字匹配和相关性评分来检索相关上下文。要学习;了解更多信息,请参阅MongoDB Search 概述。

  • 混合搜索:将MongoDB 向量搜索 与MongoDB Search 相结合,以充分利用两种方法的优势。要学习;了解详情,请参阅如何执行混合搜索。

您可以手动定义工具,也可以使用框架如 LangChain 和 LangGraph 等框架来定义工具,这些框架为工具的创建和调用提供了内置抽象。

工具被定义为智能体可调用的函数,用于执行特定任务。例如,以下语法说明了如何定义一个运行向量搜索查询的工具:

async function vectorSearchTool(query) {
const pipeline = [
{
$vectorSearch: {
// Vector search query pipeline...
}
}
];
const results = await collection.aggregate(pipeline).toArray();
return results;
}
def vector_search_tool(query: str) -> str:
pipeline = [
{
"$vectorSearch": {
# Vector search query pipeline...
}
}
]
results = collection.aggregate(pipeline)
array_of_results = []
for doc in results:
array_of_results.append(doc)
return array_of_results

工具调用是代理用来执行工具的调用。您可以定义如何在代理中进程工具调用,或使用框架来处理此问题。这些通常定义为JSON对象,其中包括工具名称和其他要传递给该工具的参数,以便代理可以使用适当的参数调用该工具。示例,以下语法说明了代理如何调用向量搜索工具:

{
"tool": "vector_search_tool",
"args": { "query": "What is MongoDB?" },
"id": "call_H5TttXb423JfoulF1qVfPN3m"
}

通过使用MongoDB作为向量数据库,您可以创建实现代理 RAG 的检索工具,代理 RAG 是RAG的高级形式,允许您通过AI代理动态编排检索和生成进程。

显示采用MongoDB的代理 RAG 架构的示意图

这种方法支持更复杂的工作流程和用户交互。示例,您可以配置AI代理,根据任务确定最佳检索工具,例如使用MongoDB Vector Search 进行语义搜索,使用MongoDB Search 进行全文搜索。您还可以为不同的集合定义不同的检索工具,以进一步自定义代理的检索功能。

代理的记忆涉及存储先前交互的信息,以便代理可以从过去的经验中学习,并提供更相关和个性化的响应。这对于需要上下文的任务尤其重要,例如会话代理,其中代理需要记住会话中的先前回合,以提供连贯且上下文相关的响应。代理记忆主要有两种类型:

  • 短期记忆:存储当前会话的信息,如最近的对话回合和当前任务上下文。

  • 长期记忆:跨会话保存信息,其中可以包括过去的对话和一段时间内的个性化首选项。

由于MongoDB也是一个文档数据库,因此您可以通过将代理的交互存储在MongoDB集合中来实现代理的内存。然后,代理可以根据需要查询或更新此集合。使用MongoDB实现代理内存有多种方法:

  • 对于短期记忆,您可以在存储交互时包含一个 session_id 字段,用于标识特定会话,然后查询具有相同 ID 的交互,将其作为上下文传递给代理。

  • 为了长期记忆,您可能会处理与 LLM 的多次交互,以提取相关信息,例如用户偏好或重要的上下文,然后将这些信息存储在一个单独的集合中,以便代理在需要时查询。

  • 要构建强大的内存管理系统,启用更高效、更复杂地检索对话历史记录,请利用MongoDB SearchMongoDB 向量搜索 来存储、索引和查询跨会话的重要交互。

存储短期记忆的集合中的文档可能类似于以下内容:

{
"session_id": "123",
"user_id": "jane_doe",
"interactions":
[
{
"role": "user",
"content": "What is MongoDB?",
"timestamp": "2025-01-01T12:00:00Z"
},
{
"role": "assistant",
"content": "MongoDB is the world's leading modern database.",
"timestamp": "2025-01-01T12:00:05Z"
}
]
}

存储长期记忆的集合中的文档可能类似于以下内容:

{
"user_id": "jane_doe",
"last_updated": "2025-05-22T09:15:00Z",
"preferences": {
"conversation_tone": "casual",
"custom_instructions": [
"I prefer concise answers."
],
},
"facts": [
{
"interests": ["AI", "MongoDB"],
}
]
}

以下框架还使用MongoDB为代理内存提供直接抽象:

框架
功能

LangChain

  • MongoDBChatMessageHistory:聊天消息历史组件

  • MongoDBAtlasSemanticCache:语义缓存组件

要学习;了解更多信息,请参阅教程。

LangGraph

  • MongoDBSaver:可用于持久性的短期内存检查指针

  • MongoDBStore:用于在MongoDB中存储内存的长期文档存储(仅在Python集成中可用)

要学习;了解更多信息,请参阅 LangGraphLangGraph.js。

以下教程演示了如何在没有代理框架的情况下,使用MongoDB提供代理 RAG 和内存构建AI代理。


➤ 使用选择语言下拉菜单设立本教程的语言。


使用本教程的可运行版本以作为 Python 笔记本。

如要完成本教程,您必须具备以下条件:

  • 以下MongoDB 集群类型之一:

  • Voyage AI API 密钥。

  • 一个 OpenAI API 密钥。

注意

本教程使用来自 Voyage AI 和 OpenAI 的模型,但您可以修改代码以使用您选择的模型。

此 AI 代理可用于回答有关自定义数据源的问题并执行计算。它还可以记住之前的交互,以便提供更准确的响应。它使用以下组件:

  • 感知:文本输入。

  • 规划:使用一个LLM和多个提示来推理完成任务。

  • 工具:向量搜索工具和计算器工具。

  • Memory:将交互存储在 MongoDB 集合中。

1
  1. 初始化项目并安装依赖项。

    创建一个新的项目目录,然后安装所需的依赖项:

    mkdir mongodb-ai-agent
    cd mongodb-ai-agent
    npm init -y
    npm install --quiet dotenv mongodb voyageai openai langchain @langchain/community @langchain/core mathjs pdf-parse@1

    注意

    您的项目将使用以下结构:

    mongodb-ai-agent
    ├── .env
    ├── config.js
    ├── ingest-data.js
    ├── tools.js
    ├── memory.js
    ├── planning.js
    └── index.js
  2. 配置环境。

    在项目中创建名为 .env 的环境文件。此文件将包含代理的API密钥、 MongoDB连接字符串以及MongoDB 数据库和集合名称。

    将以下代码复制并粘贴到 .env文件。

    将占位符值替换为MongoDB连接字符串以及 Voyage AI和 OpenAI API密钥。

    MONGODB_URI="<mongodb-connection-string>"
    VOYAGE_API_KEY="<voyage-api-key>"
    OPENAI_API_KEY= "<openai-api-key>"

    注意

    <connection-string> 替换为您的 Atlas 集群或本地部署的连接字符串。

    连接字符串应使用以下格式:

    mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net

    要学习;了解更多信息,请参阅通过客户端库连接到集群。

    连接字符串应使用以下格式:

    mongodb://localhost:<port-number>/?directConnection=true

    要学习;了解更多信息,请参阅连接字符串。

2

在项目中创建一个名为 config.js 的文件。此文件将读取您的环境变量,并将应用程序连接到MongoDB 数据库和 OpenAI 等服务。

将以下代码复制并粘贴到 config.js文件中。

import dotenv from 'dotenv';
import { MongoClient } from 'mongodb';
import OpenAI from "openai";
// Load environment variables from .env file
dotenv.config();
// MongoDB cluster configuration
export const MONGODB_URI = process.env.MONGODB_URI;
export const mongoClient = new MongoClient(MONGODB_URI);
export const agentDb = mongoClient.db("ai_agent_db");
export const vectorCollection = agentDb.collection("embeddings");
export const memoryCollection = agentDb.collection("chat_history");
// Model Configuration
export const OPENAI_MODEL = "gpt-4o";
export const VOYAGE_MODEL = "voyage-3-large";
export const VOYAGE_API_KEY = process.env.VOYAGE_API_KEY;
// Initialize OpenAI Client
export const openAIClient = new OpenAI({ apiKey: process.env.OPENAI_API_KEY,});
3

在项目中创建一个名为 ingest-data.js 的文件。此脚本使用 voyage-3-large 嵌入模型将包含最新MongoDB收益报告的示例PDF 提取到MongoDB中的集合中。此代码还包含一个函数,用于在数据上创建向量搜索索引(如果尚不存在)。

要了解更多信息,请参阅摄取。

将以下代码复制并粘贴到 ingest-data.js文件中。

import { PDFLoader } from "@langchain/community/document_loaders/fs/pdf";
import { RecursiveCharacterTextSplitter } from "langchain/text_splitter";
import { vectorCollection } from "./config.js";
import { VOYAGE_API_KEY, VOYAGE_MODEL } from "./config.js";
import { VoyageAIClient } from "voyageai";
import { MONGODB_URI } from "./config.js";
import * as fs from 'fs';
import fetch from 'node-fetch';
console.log("Connecting to MongoDB:", MONGODB_URI);
const EMBEDDING_DIMENSIONS = 1024;
// Use Voyage AI Client SDK to get embeddings
export async function getEmbedding(data, input_type) {
if (!VOYAGE_API_KEY) {
throw new Error("VOYAGE_API_KEY is not set in environment variables.");
}
try {
const client = new VoyageAIClient({ apiKey: VOYAGE_API_KEY });
const response = await client.embed({
input: [data],
model: VOYAGE_MODEL,
input_type: input_type // "document" or "query"
});
if (response.data && response.data.length > 0) {
return response.data[0].embedding;
}
throw new Error("No embedding data found from Voyage AI response.");
}
catch (error) {
console.error("Error generating Voyage AI embedding:", error);
return null;
}
}
// Ingest data from a PDF, generate embeddings, and store in MongoDB
export async function ingestData() {
try {
// download PDF
const rawData = await fetch("https://investors.mongodb.com/node/13176/pdf");
const pdfBuffer = await rawData.arrayBuffer();
fs.writeFileSync("investor-report.pdf", Buffer.from(pdfBuffer));
// load and split PDF
const loader = new PDFLoader("investor-report.pdf");
const data = await loader.load();
const textSplitter = new RecursiveCharacterTextSplitter({
chunkSize: 400,
chunkOverlap: 20,
});
const docs = await textSplitter.splitDocuments(data);
console.log(`Chunked PDF into ${docs.length} documents.`);
// generate embeddings and insert
const insertDocuments = await Promise.all(docs.map(async doc => ({
document: doc,
embedding: await getEmbedding(doc.pageContent, "document"),
})));
const result = await vectorCollection.insertMany(insertDocuments, { ordered: false });
console.log("Inserted documents:", result.insertedCount);
} catch (err) {
console.error("Ingestion error:", err);
}
}
// Create a vector search index
export async function createVectorIndex() {
try {
// check if the index already exists
const existingIndexes = await vectorCollection.listSearchIndexes().toArray();
if (existingIndexes.some(index => index.name === "vector_index")) {
console.log("Vector index already exists. Skipping creation.");
return;
}
// define your Vector Search index
const index = {
name: "vector_index",
type: "vectorSearch",
definition: {
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": EMBEDDING_DIMENSIONS,
"similarity": "cosine",
}
]
}
}
// run the helper method to ensure the index is created
const result = await vectorCollection.createSearchIndex(index);
console.log(`New index named ${result} is building.`);
// wait for the index to be ready to query
console.log("Polling to check if the index is ready. This may take up to a minute.")
let isQueryable = false;
while (!isQueryable) {
const cursor = vectorCollection.listSearchIndexes();
for await (const index of cursor) {
if (index.name === result) {
if (index.queryable) {
console.log(`${result} is ready for querying.`);
isQueryable = true;
} else {
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
} catch (err) {
console.error("Error creating vector index:", err);
throw err;
}
}
4

在项目中创建一个名为 tools.js 的文件。该文件定义了代理可以用来回答问题的工具。在此示例中,您定义以下工具:

  • vectorSearchTool运行向量搜索查询,从集合中检索相关文档。

  • calculatorTool:使用 mathjs 库进行基本数学运算。

将以下代码复制并粘贴到您的tools.js文件中。

import { getEmbedding } from './ingest-data.js';
import { vectorCollection } from './config.js';
import { evaluate } from 'mathjs';
// Vector search tool
export async function vectorSearchTool(userInput) {
const queryEmbedding = await getEmbedding(userInput, "query");
const pipeline = [
{
$vectorSearch: {
index: "vector_index",
queryVector: queryEmbedding,
path: "embedding",
exact: true,
limit: 5
}
},
{
$project: {
_id: 0,
"document.pageContent": 1
}
}
];
const cursor = vectorCollection.aggregate(pipeline);
const results = await cursor.toArray();
return results;
}
// Simple calculator tool
export function calculatorTool(userInput) {
try {
const result = evaluate(userInput);
return String(result);
} catch (e) {
return `Error: ${e.message}`;
}
}
5

在项目中创建一个名为 memory.js 的文件。此文件定义了代理用于存储其交互的系统。在此示例中,您通过定义以下函数来实现短期记忆:

  • storeChatMessage:将交互信息存储在 MongoDB 集合中。

  • retrieveSessionHistory:通过使用 session_id 字段来获取特定会话的所有交互。

将以下代码复制并粘贴到memory.js文件中。

import { memoryCollection } from './config.js';
/**
* Store a chat message in the memory collection.
* @param {string} sessionId - unique identifier for the chat session
* @param {string} role - role of the sender (user or system)
* @param {string} content - content of the message
*/
export async function storeChatMessage(sessionId, role, content) {
const message = {
session_id: sessionId,
role,
content,
timestamp: new Date(), // use JS date for timestamp
};
await memoryCollection.insertOne(message);
}
/**
* Retrieve the chat history for a session.
* @param {string} sessionId - unique identifier for the chat session
* @returns {Promise<Array<{role: string, content: string}>>}
*/
export async function retrieveSessionHistory(sessionId) {
const cursor = memoryCollection
.find({ session_id: sessionId })
.sort({ timestamp: 1 });
const messages = [];
await cursor.forEach(msg => {
messages.push({ role: msg.role, content: msg.content });
});
return messages;
}
6

在项目中创建一个名为 planning.js 的文件。该文件将包括各种提示和 LLM 调用,以确定代理的执行流程。在此示例中,您定义以下函数:

  • openAIChatCompletion:调用 OpenAI API来生成响应的辅助函数。

  • toolSelector:决定 LLM 如何为任务选择合适的工具。

  • generateAnswer:通过使用工具、调用 LLM 和处理结果来编排代理执行流。

  • getLLMResponse:用于生成 LLM 响应的辅助函数。

将以下代码复制并粘贴到planning.js文件中。

import { vectorSearchTool, calculatorTool } from './tools.js';
import { storeChatMessage, retrieveSessionHistory } from './memory.js';
import { openAIClient, OPENAI_MODEL } from './config.js';
// OpenAI chat completion helper
export async function openAIChatCompletion(messages) {
try {
const completion = await openAIClient.chat.completions.create({
model: OPENAI_MODEL,
messages,
max_tokens: 1024,
});
return completion.choices[0].message.content;
} catch (error) {
console.error("Error in openAIChatCompletion:", error);
throw error;
}
}
// Tool selector function to determine which tool to use based on user input and session history
export async function toolSelector(userInput, sessionHistory = []) {
const systemPrompt = `
Select the appropriate tool from the options below. Consider the full context of the conversation before deciding.
Tools available:
- vector_search_tool: Retrieve specific context about recent MongoDB earnings and announcements
- calculator_tool: For mathematical operations
- none: For general questions without additional context
Process for making your decision:
1. Analyze if the current question relates to or follows up on a previous vector search query
2. For follow-up questions, incorporate context from previous exchanges to create a comprehensive search query
3. Only use calculator_tool for explicit mathematical operations
4. Default to none only when certain the other tools won't help
When continuing a conversation:
- Identify the specific topic being discussed
- Include relevant details from previous exchanges
- Formulate a query that stands alone but preserves conversation context
Return a JSON object only: {"tool": "selected_tool", "input": "your_query"}
`.trim();
const messages = [
{ role: "system", content: systemPrompt },
...sessionHistory,
{ role: "user", content: userInput }
];
try {
const response = await openAIChatCompletion(messages);
let toolCall;
try {
toolCall = JSON.parse(response);
} catch {
try {
toolCall = eval(`(${response})`);
} catch {
return { tool: "none", input: userInput };
}
}
return {
tool: toolCall.tool || "none",
input: toolCall.input || userInput
};
} catch (err) {
console.error("Error in toolSelector:", err);
return { tool: "none", input: userInput };
}
}
// Function to get LLM response based on messages and system message content
async function getLlmResponse(messages, systemMessageContent) {
const systemMessage = { role: "system", content: systemMessageContent };
let fullMessages;
if (messages.some(msg => msg.role === "system")) {
fullMessages = [...messages, systemMessage];
} else {
fullMessages = [systemMessage, ...messages];
}
const response = await openAIChatCompletion(fullMessages);
return response;
}
// Function to generate response based on user input
export async function generateResponse(sessionId, userInput) {
await storeChatMessage(sessionId, "user", userInput);
const sessionHistory = await retrieveSessionHistory(sessionId);
const llmInput = [...sessionHistory, { role: "user", content: userInput }];
const { tool, input: toolInput } = await toolSelector(userInput, sessionHistory);
console.log("Tool selected:", tool);
let response;
if (tool === "vector_search_tool") {
const contextResults = await vectorSearchTool(toolInput);
const context = contextResults.map(doc => doc.document?.pageContent || JSON.stringify(doc)).join('\n---\n');
const systemMessageContent = `
Answer the user's question based on the retrieved context and conversation history.
1. First, understand what specific information the user is requesting
2. Then, locate the most relevant details in the context provided
3. Finally, provide a clear, accurate response that directly addresses the question
If the current question builds on previous exchanges, maintain continuity in your answer.
Only state facts clearly supported by the provided context. If information is not available, say 'I DON'T KNOW'.
Context:
${context}
`.trim();
response = await getLlmResponse(llmInput, systemMessageContent);
} else if (tool === "calculator_tool") {
response = calculatorTool(toolInput);
} else {
const systemMessageContent = "You are a helpful assistant. Respond to the user's prompt as best as you can based on the conversation history.";
response = await getLlmResponse(llmInput, systemMessageContent);
}
await storeChatMessage(sessionId, "system", response);
return response;
}
7

最后,在项目中创建一个名为 index.js 的文件。此文件运行代理并允许您与其交互。

将以下代码复制并粘贴到索引文件中。

import readline from 'readline';
import { mongoClient } from './config.js';
import { ingestData, createVectorIndex } from './ingest-data.js';
import { generateResponse } from './planning.js';
// Prompt for user input
async function prompt(question) {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
return new Promise(resolve => rl.question(question, answer => {
rl.close();
resolve(answer);
}));
}
async function main() {
try {
await mongoClient.connect();
const runIngest = await prompt("Ingest sample data? (y/n): ");
if (runIngest.trim().toLowerCase() === 'y') {
await ingestData();
console.log("\nAttempting to create/verify Vector Search Index...");
await createVectorIndex();
} else {
await createVectorIndex(); // ensure index exists even if not ingesting data
}
const sessionId = await prompt("Enter a session ID: ");
while (true) {
const userQuery = await prompt("\nEnter your query (or type 'quit' to exit): ");
if (userQuery.trim().toLowerCase() === 'quit') break;
if (!userQuery.trim()) {
console.log("Query cannot be empty. Please try again.");
continue;
}
const answer = await generateResponse(sessionId, userQuery);
console.log("\nAnswer:");
console.log(answer);
}
} finally {
await mongoClient.close();
}
}
main();

保存项目,然后运行以下命令。当您运行代理时:

  • 如果还没有,请指示代理引入示例数据。

  • 请输入会话 ID,开始新会话或继续现有会话。

  • 提出问题。代理会根据您的工具、上一个交互以及在规划阶段定义的提示生成响应。

请参阅示例输出,获取示例交互:

node index.js
Ingest sample data? (y/n): y
Chunked PDF into 100 documents.
Inserted documents: 100
Attempting to create/verify Vector Search Index...
New index named vector_index is building.
Polling to check if the index is ready. This may take up to a minute.
vector_index is ready for querying.
Enter a session ID: 123
Enter your query (or type 'quit' to exit): What was MongoDB's latest acquisition?
Tool selected: vector_search_tool
Answer:
MongoDB recently acquired Voyage AI, a pioneer in embedding and reranking models that power next-generation AI applications.
Enter your query (or type 'quit' to exit): What do they do?
Tool selected: vector_search_tool
Answer: Voyage AI is a company that specializes in
state-of-the-art embedding and reranking models designed to
power next-generation AI applications. These technologies help
organizations build more advanced and trustworthy AI
capabilities.
Enter your query (or type 'quit' to exit): What is 123+456?
Tool selected: calculator_tool
Answer:
579

提示

如果您使用的是Atlas ,则可以导航到 ai_agent_db.embeddings 命名空间在Atlas用户界面中。以验证嵌入和交互。

8

现在您已经有了一个基本的 AI 代理,可以通过以下方式继续开发:

  • 提升向量搜索工具的性能微调 RAG 管道。

  • 为代理添加更多工具,例如混合全文搜索工具。

  • 通过使用更高级的提示和 LLM 调用来完善规划阶段。

  • 使用MongoDB SearchMongoDB 向量搜索 来存储和检索跨会话的重要交互,从而实现长期记忆和更高级的记忆系统。

1
  1. 初始化项目并安装依赖项。

    创建一个新的项目目录,然后安装所需的依赖项:

    mkdir mongodb-ai-agent
    cd mongodb-ai-agent
    pip install --quiet --upgrade pymongo voyageai openai langchain langchain-mongodb
    langchain-community python-dotenv

    注意

    您的项目将使用以下结构:

    mongodb-ai-agent
    ├── .env
    ├── config.py
    ├── ingest_data.py
    ├── tools.py
    ├── memory.py
    ├── planning.py
    ├── main.py
  2. 配置环境。

    在项目中创建名为 .env 的环境文件。此文件将包含代理的API密钥、 MongoDB连接字符串以及MongoDB 数据库和集合名称。

    将以下代码复制并粘贴到 .env文件。

    将占位符值替换为MongoDB连接字符串以及 Voyage AI和 OpenAI API密钥。

    MONGODB_URI="<mongodb-connection-string>"
    VOYAGE_API_KEY="<voyage-api-key>"
    OPENAI_API_KEY= "<openai-api-key>"

    注意

    <connection-string> 替换为您的 Atlas 集群或本地部署的连接字符串。

    连接字符串应使用以下格式:

    mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net

    要学习;了解更多信息,请参阅通过客户端库连接到集群。

    连接字符串应使用以下格式:

    mongodb://localhost:<port-number>/?directConnection=true

    要学习;了解更多信息,请参阅连接字符串。

2

在项目中创建一个名为 config.py 的文件。此文件将读取您的环境变量,并将应用程序连接到MongoDB 数据库和 OpenAI 等服务。

将以下代码复制并粘贴到您的 config.py 文件中。

from pymongo import MongoClient
from openai import OpenAI
import voyageai
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Environment variables (private)
MONGODB_URI = os.getenv("MONGODB_URI")
VOYAGE_API_KEY = os.getenv("VOYAGE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# MongoDB cluster configuration
mongo_client = MongoClient(MONGODB_URI)
agent_db = mongo_client["ai_agent_db"]
vector_collection = agent_db["embeddings"]
memory_collection = agent_db["chat_history"]
# Model configuration
voyage_client = voyageai.Client(api_key=VOYAGE_API_KEY)
openai_client = OpenAI(api_key=OPENAI_API_KEY)
VOYAGE_MODEL = "voyage-3-large"
OPENAI_MODEL = "gpt-4o"
3

在项目中创建一个名为 ingest_data.py 的文件。此脚本使用 voyage-3-large 嵌入模型将包含最新MongoDB收益报告的示例PDF 提取到MongoDB中的集合中。此代码还包含一个函数,用于在数据上创建向量搜索索引(如果尚不存在)。

要了解更多信息,请参阅摄取。

将以下代码复制并粘贴到 ingest_data.py文件中。

from config import vector_collection, voyage_client, VOYAGE_MODEL
from pymongo.operations import SearchIndexModel
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
import time
# Define a function to generate embeddings
def get_embedding(data, input_type = "document"):
embeddings = voyage_client.embed(
data, model = VOYAGE_MODEL, input_type = input_type
).embeddings
return embeddings[0]
# --- Ingest embeddings into MongoDB ---
def ingest_data():
# Chunk PDF data
loader = PyPDFLoader("https://investors.mongodb.com/node/13176/pdf")
data = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=20)
documents = text_splitter.split_documents(data)
print(f"Successfully split PDF into {len(documents)} chunks.")
# Ingest chunked documents into collection
print("Generating embeddings and ingesting documents...")
docs_to_insert = []
for i, doc in enumerate(documents):
embedding = get_embedding(doc.page_content)
if embedding:
docs_to_insert.append({
"text": doc.page_content,
"embedding": embedding
})
if docs_to_insert:
result = vector_collection.insert_many(docs_to_insert)
print(f"Inserted {len(result.inserted_ids)} documents into the collection.")
else:
print("No documents were inserted. Check embedding generation process.")
# --- Create the vector search index ---
index_name = "vector_index"
search_index_model = SearchIndexModel(
definition = {
"fields": [
{
"type": "vector",
"numDimensions": 1024,
"path": "embedding",
"similarity": "cosine"
}
]
},
name=index_name,
type="vectorSearch"
)
try:
vector_collection.create_search_index(model=search_index_model)
print(f"Search index '{index_name}' creation initiated.")
except Exception as e:
print(f"Error creating search index: {e}")
return
# Wait for initial sync to complete
print("Polling to check if the index is ready. This may take up to a minute.")
predicate=None
if predicate is None:
predicate = lambda index: index.get("queryable") is True
while True:
indices = list(vector_collection.list_search_indexes(index_name))
if len(indices) and predicate(indices[0]):
break
time.sleep(5)
print(index_name + " is ready for querying.")
4

在项目中创建一个名为 tools.py 的文件。该文件定义了代理可以用来回答问题的工具。在此示例中,您定义以下工具:

  • vector_search_tool运行向量搜索查询,从集合中检索相关文档。

  • calculator_tool:使用 eval() 函数进行基本数学运算。

将以下代码复制并粘贴到您的 tools.py 文件中。

from config import vector_collection
from ingest_data import get_embedding
# Define a vector search tool
def vector_search_tool(user_input: str) -> str:
query_embedding = get_embedding(user_input)
pipeline = [
{
"$vectorSearch": {
"index": "vector_index",
"queryVector": query_embedding,
"path": "embedding",
"exact": True,
"limit": 5
}
}, {
"$project": {
"_id": 0,
"text": 1
}
}
]
results = vector_collection.aggregate(pipeline)
array_of_results = []
for doc in results:
array_of_results.append(doc)
return array_of_results
# Define a simple calculator tool
def calculator_tool(user_input: str) -> str:
try:
result = eval(user_input)
return str(result)
except Exception as e:
return f"Error: {str(e)}"
5

在项目中创建一个名为 memory.py 的文件。此文件定义了代理用于存储其交互的系统。在此示例中,您通过定义以下函数来实现短期记忆:

  • store_chat_message:将交互信息存储在 MongoDB 集合中。

  • retrieve_session_history:通过使用 session_id 字段来获取特定会话的所有交互。

将以下代码复制并粘贴到您的 memory.py 文件中。

from config import memory_collection
from datetime import datetime
from typing import List
def store_chat_message(session_id: str, role: str, content: str) -> None:
message = {
"session_id": session_id, # Unique identifier for the chat session
"role": role, # Role of the sender (user or system)
"content": content, # Content of the message
"timestamp": datetime.now(), # Timestamp of when the message was sent
}
memory_collection.insert_one(message)
def retrieve_session_history(session_id: str) -> List:
# Query the collection for messages with a specific "session_id" in ascending order
cursor = memory_collection.find({"session_id": session_id}).sort("timestamp", 1)
# Iterate through the cursor and return a JSON object with the message role and content
if cursor:
messages = [{"role": msg["role"], "content": msg["content"]} for msg in cursor]
else:
messages = []
return messages
6

在项目中创建一个名为 planning.py 的文件。该文件将包括各种提示和 LLM 调用,以确定代理的执行流程。在此示例中,您定义以下函数:

  • tool_selector:决定 LLM 如何为任务选择合适的工具。

  • generate_answer:通过使用工具、调用 LLM 和处理结果来编排代理执行流。

  • get_llm_response:用于生成 LLM 响应的辅助函数。

将以下代码复制并粘贴到您的 planning.py 文件中。

from config import openai_client, OPENAI_MODEL
from tools import vector_search_tool, calculator_tool
from memory import store_chat_message, retrieve_session_history
# Define a tool selector function that decides which tool to use based on user input and message history
def tool_selector(user_input, session_history=None):
messages = [
{
"role": "system",
"content": (
"Select the appropriate tool from the options below. Consider the full context of the conversation before deciding.\n\n"
"Tools available:\n"
"- vector_search_tool: Retrieve specific context about recent MongoDB earnings and announcements\n"
"- calculator_tool: For mathematical operations\n"
"- none: For general questions without additional context\n"
"Process for making your decision:\n"
"1. Analyze if the current question relates to or follows up on a previous vector search query\n"
"2. For follow-up questions, incorporate context from previous exchanges to create a comprehensive search query\n"
"3. Only use calculator_tool for explicit mathematical operations\n"
"4. Default to none only when certain the other tools won't help\n\n"
"When continuing a conversation:\n"
"- Identify the specific topic being discussed\n"
"- Include relevant details from previous exchanges\n"
"- Formulate a query that stands alone but preserves conversation context\n\n"
"Return a JSON object only: {\"tool\": \"selected_tool\", \"input\": \"your_query\"}"
)
}
]
if session_history:
messages.extend(session_history)
messages.append({"role": "user", "content": user_input})
response = openai_client.chat.completions.create(
model=OPENAI_MODEL,
messages=messages
).choices[0].message.content
try:
tool_call = eval(response)
return tool_call.get("tool"), tool_call.get("input")
except:
return "none", user_input
# Define the agent workflow
def generate_response(session_id: str, user_input: str) -> str:
# Store the user input in the chat history collection
store_chat_message(session_id, "user", user_input)
# Initialize a list of inputs to pass to the LLM
llm_input = []
# Retrieve the session history for the current session and add it to the LLM input
session_history = retrieve_session_history(session_id)
llm_input.extend(session_history)
# Append the user message in the correct format
user_message = {
"role": "user",
"content": user_input
}
llm_input.append(user_message)
# Call the tool_selector function to determine which tool to use
tool, tool_input = tool_selector(user_input, session_history)
print("Tool selected: ", tool)
# Process based on selected tool
if tool == "vector_search_tool":
context = vector_search_tool(tool_input)
# Construct the system prompt using the retrieved context and append it to the LLM input
system_message_content = (
f"Answer the user's question based on the retrieved context and conversation history.\n"
f"1. First, understand what specific information the user is requesting\n"
f"2. Then, locate the most relevant details in the context provided\n"
f"3. Finally, provide a clear, accurate response that directly addresses the question\n\n"
f"If the current question builds on previous exchanges, maintain continuity in your answer.\n"
f"Only state facts clearly supported by the provided context. If information is not available, say 'I DON'T KNOW'.\n\n"
f"Context:\n{context}"
)
response = get_llm_response(llm_input, system_message_content)
elif tool == "calculator_tool":
# Perform the calculation using the calculator tool
response = calculator_tool(tool_input)
else:
system_message_content = "You are a helpful assistant. Respond to the user's prompt as best as you can based on the conversation history."
response = get_llm_response(llm_input, system_message_content)
# Store the system response in the chat history collection
store_chat_message(session_id, "system", response)
return response
# Helper function to get the LLM response
def get_llm_response(messages, system_message_content):
# Add the system message to the messages list
system_message = {
"role": "system",
"content": system_message_content,
}
# If the system message should go at the end (for context-based queries)
if any(msg.get("role") == "system" for msg in messages):
messages.append(system_message)
else:
# For general queries, put system message at beginning
messages = [system_message] + messages
# Get response from LLM
response = openai_client.chat.completions.create(
model=OPENAI_MODEL,
messages=messages
).choices[0].message.content
return response
7

最后,在项目中创建一个名为 main.py 的文件。此文件运行代理并允许您与其交互。

将以下代码复制并粘贴到 main.py文件中。

from config import mongo_client
from ingest_data import ingest_data
from planning import generate_response
if __name__ == "__main__":
try:
run_ingest = input("Ingest sample data? (y/n): ")
if run_ingest.lower() == 'y':
ingest_data()
session_id = input("Enter a session ID: ")
while True:
user_query = input("\nEnter your query (or type 'quit' to exit): ")
if user_query.lower() == 'quit':
break
if not user_query.strip():
print("Query cannot be empty. Please try again.")
continue
answer = generate_response(session_id, user_query)
print("\nAnswer:")
print(answer)
finally:
mongo_client.close()

保存项目,然后运行以下命令。当您运行代理时:

  • 如果还没有,请指示代理引入示例数据。

  • 请输入会话 ID,开始新会话或继续现有会话。

  • 提出问题。代理会根据您的工具、上一个交互以及在规划阶段定义的提示生成响应。

请参阅示例输出,获取示例交互:

python main.py
Ingest sample data? (y/n): y
Successfully split PDF into 104 chunks.
Generating embeddings and ingesting documents...
Inserted 104 documents into the collection.
Search index 'vector_index' creation initiated.
Polling to check if the index is ready. This may take up to a minute.
vector_index is ready for querying.
Enter a session ID: 123
Enter your query (or type 'quit' to exit): What was MongoDB's latest acquisition?
Tool selected: vector_search_tool
Answer:
MongoDB's latest acquisition was Voyage AI.
Enter your query (or type 'quit' to exit): What do they do?
Tool selected: vector_search_tool
Answer:
Voyage AI is a company that specializes in state-of-the-art embedding and reranking models designed to power next-generation AI applications. These technologies help organizations build more advanced and trustworthy AI capabilities.
Enter your query (or type 'quit' to exit): What is 123+456?
Tool selected: calculator_tool
Answer:
579

提示

如果您使用的是Atlas ,则可以导航到 ai_agent_db.embeddings 命名空间在Atlas用户界面中。以验证嵌入和交互。

8

现在您已经有了一个基本的 AI 代理,可以通过以下方式继续开发:

  • 提升向量搜索工具的性能微调 RAG 管道。

  • 为代理添加更多工具,例如混合全文搜索工具。

  • 通过使用更高级的提示和 LLM 调用来完善规划阶段。

  • 使用MongoDB SearchMongoDB 向量搜索 来存储和检索跨会话的重要交互,从而实现长期记忆和更高级的记忆系统。

如需更多关于使用 MongoDB 构建 AI 代理的教程,请参阅下表: