您可以将MongoDB Atlas与 LangGraph.js 集成以构建AI代理。本教程演示如何使用 LangGraph.js 和MongoDB Vector Search构建一个可以回答有关数据的问题的代理。
具体来说,您需要执行以下操作:
设置环境。
配置MongoDB 集群。
构建代理,包括代理工具。
为代理添加内存。
创建服务器并测试代理。
先决条件
在开始之前,请确保您具备以下内容:
以下MongoDB 集群类型之一:
运行MongoDB 版本的Atlas6.0.11 集群,7.0.2 或更高版本。确保您的 IP解决 包含在Atlas项目的 访问权限列表 中。
使用Atlas CLI创建的本地Atlas部署。要学习;了解更多信息,请参阅创建本地Atlas部署。
安装了Search 和 Vector Search的MongoDB Community或 Enterprise集群。
Voyage AI API密钥。要创建帐户和API密钥,请参阅 Voyage AI网站。
OpenAI API密钥。您必须拥有一个具有可用于API请求的积分的 OpenAI 帐户。要学习;了解有关注册 OpenAI 帐户的更多信息,请参阅 OpenAI API网站。
注意
本教程使用 OpenAI 和 Voyage AI的模型,但您可以修改代码以使用您选择的模型。
设置环境
要设置环境,请完成以下步骤:
初始化项目并安装依赖项。
创建新的项目目录,然后在项目中运行以下命令以安装所需的依赖项:
npm init -y npm i -D typescript ts-node @types/express @types/node npx tsc --init npm i langchain @langchain/langgraph @langchain/mongodb @langchain/community @langchain/langgraph-checkpoint-mongodb dotenv express mongodb zod
注意
您的项目使用以下结构:
├── .env ├── index.ts ├── agent.ts ├── seed-database.ts ├── package.json ├── tsconfig.json
创建环境文件。
在项目根目录中创建 .env
文件并添加API密钥和连接字符串:
OPENAI_API_KEY = "<openai-api-key>" MONGODB_URI = "<connection-string>" VOYAGEAI_API_KEY = "<voyage-api-key>"
将 <connection-string>
替换为您的 Atlas 集群或本地部署的连接字符串。
连接字符串应使用以下格式:
mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net
要学习;了解更多信息,请参阅通过驱动程序连接到集群。
配置MongoDB集群
在本部分中,您将配置示例数据并将其引入MongoDB 集群,以启用对数据的向量搜索。
创建文件以连接到MongoDB。
创建一个 index.ts
文件,用于建立与MongoDB 集群的连接:
import { MongoClient } from "mongodb"; import 'dotenv/config'; const client = new MongoClient(process.env.MONGODB_URI as string); async function startServer() { try { await client.connect(); await client.db("admin").command({ ping: 1 }); console.log("Pinged your deployment. You successfully connected to MongoDB!"); // ... rest of the server setup } catch (error) { console.error("Error connecting to MongoDB:", error); process.exit(1); } } startServer();
将样本数据植入到您的集群中。
创建 seed-database.ts
脚本以生成和存储示例员工记录。此脚本执行以下操作:
为员工记录定义模式。
创建函数以使用 LLM 生成示例员工数据。
处理每条记录以创建用于嵌入的文本摘要。
使用 LangChain MongoDB集成将MongoDB 集群初始化为向量存储。该组件生成向量嵌入并将文档存储在您的
hr_database.employees
命名空间中。
import { ChatOpenAI } from "@langchain/openai"; import { StructuredOutputParser } from "@langchain/core/output_parsers"; import { MongoClient } from "mongodb"; import { z } from "zod"; import "dotenv/config"; const llm = new ChatOpenAI({ modelName: "gpt-4o-mini", temperature: 0.7, }); const EmployeeSchema = z.object({ employee_id: z.string(), first_name: z.string(), last_name: z.string(), date_of_birth: z.string(), address: z.object({ street: z.string(), city: z.string(), state: z.string(), postal_code: z.string(), country: z.string(), }), contact_details: z.object({ email: z.string().email(), phone_number: z.string(), }), job_details: z.object({ job_title: z.string(), department: z.string(), hire_date: z.string(), employment_type: z.string(), salary: z.number(), currency: z.string(), }), work_location: z.object({ nearest_office: z.string(), is_remote: z.boolean(), }), reporting_manager: z.string().nullable(), skills: z.array(z.string()), performance_reviews: z.array( z.object({ review_date: z.string(), rating: z.number(), comments: z.string(), }) ), benefits: z.object({ health_insurance: z.string(), retirement_plan: z.string(), paid_time_off: z.number(), }), emergency_contact: z.object({ name: z.string(), relationship: z.string(), phone_number: z.string(), }), notes: z.string(), }); type Employee = z.infer<typeof EmployeeSchema>; const parser = StructuredOutputParser.fromZodSchema(z.array(EmployeeSchema)); async function generateSyntheticData(): Promise<Employee[]> { const prompt = `You are a helpful assistant that generates employee data. Generate 10 fictional employee records. Each record should include the following fields: employee_id, first_name, last_name, date_of_birth, address, contact_details, job_details, work_location, reporting_manager, skills, performance_reviews, benefits, emergency_contact, notes. Ensure variety in the data and realistic values. ${parser.getFormatInstructions()}`; console.log("Generating synthetic data..."); const response = await llm.invoke(prompt); return parser.parse(response.content as string); } async function createEmployeeSummary(employee: Employee): Promise<string> { return new Promise((resolve) => { const jobDetails = `${employee.job_details.job_title} in ${employee.job_details.department}`; const skills = employee.skills.join(", "); const performanceReviews = employee.performance_reviews .map( (review) => `Rated ${review.rating} on ${review.review_date}: ${review.comments}` ) .join(" "); const basicInfo = `${employee.first_name} ${employee.last_name}, born on ${employee.date_of_birth}`; const workLocation = `Works at ${employee.work_location.nearest_office}, Remote: ${employee.work_location.is_remote}`; const notes = employee.notes; const summary = `${basicInfo}. Job: ${jobDetails}. Skills: ${skills}. Reviews: ${performanceReviews}. Location: ${workLocation}. Notes: ${notes}`; resolve(summary); }); } const fetchEmbeddings = async (records: { pageContent: string }[]) => { const apiUrl = "https://api.voyageai.com/v1/embeddings"; const apiKey = process.env.VOYAGEAI_API_KEY; const inputs = records.map(record => record.pageContent); const requestBody = { input: inputs, model: "voyage-3.5", }; try { const response = await fetch(apiUrl, { method: "POST", headers: { Authorization: `Bearer ${apiKey}`, "Content-Type": "application/json", }, body: JSON.stringify(requestBody), }); if (!response.ok) { throw new Error(`Error: ${response.status} ${response.statusText}`); } const data = await response.json(); return data; } catch (error) { console.error("Error while fetching embeddings:", error); } }; async function seedDatabase(): Promise<void> { try { const client = new MongoClient(process.env.MONGODB_URI as string); await client.connect(); await client.db("admin").command({ ping: 1 }); console.log("Pinged your deployment. You successfully connected to MongoDB!"); const db = client.db("hr_database"); const collection = db.collection("employees"); await collection.deleteMany({}); const syntheticData = await generateSyntheticData(); const recordsWithSummaries = await Promise.all( syntheticData.map(async (record) => ({ pageContent: await createEmployeeSummary(record), metadata: {...record}, })) ); for (const record of recordsWithSummaries ) { const db = client.db("hr_database"); const collection = db.collection("employees"); const embedding = await fetchEmbeddings([record]); const enrichedRecord = { pageContent: record.pageContent, metadata: record.metadata, embedding: embedding.data[0].embedding } const result = await collection.insertOne(enrichedRecord); console.log("Successfully added database record:", result); } await client.close(); } catch (error) { console.error("Error seeding database:", error); }} seedDatabase().catch(console.error);
运行种子脚本。
npx ts-node seed-database.ts
Pinged your deployment. You successfully connected to MongoDB! Generating synthetic data... Successfully added database record: { acknowledged: true, insertedId: new ObjectId('685d89d966545cfb242790f0') } Successfully added database record: { acknowledged: true, insertedId: new ObjectId('685d89d966545cfb242790f1') } Successfully added database record: { acknowledged: true, insertedId: new ObjectId('685d89da66545cfb242790f2') } Successfully added database record: { acknowledged: true, insertedId: new ObjectId('685d89da66545cfb242790f3') }
提示
运行脚本后,您可以导航到Atlas 用户界面中的 hr_database.employees
命名空间,查看MongoDB 集群中的种子数据。
创建MongoDB Vector Search索引。
按照以下步骤为hr_database.employees
命名空间创建MongoDB Vector Search索引。将索引命名为 vector_index
并指定以下索引定义:
{ "fields": [ { "numDimensions": 1024, "path": "embedding", "similarity": "cosine", "type": "vector" } ] }
构建代理
在本节中,您将构建一个图表,用于编排代理的工作流程。图表将定义代理响应查询时所采取步骤的顺序。
创建agent.ts
文件。
在您的项目中创建一个名为 agent.ts
的新文件,然后添加以下代码以开始设置代理。您将在后续步骤中向异步函数添加更多代码。
import { ChatOpenAI } from "@langchain/openai"; import { AIMessage, BaseMessage, HumanMessage } from "@langchain/core/messages"; import { ChatPromptTemplate, MessagesPlaceholder } from "@langchain/core/prompts"; import { StateGraph } from "@langchain/langgraph"; import { Annotation } from "@langchain/langgraph"; import { tool } from "@langchain/core/tools"; import { ToolNode } from "@langchain/langgraph/prebuilt"; import { MongoDBSaver } from "@langchain/langgraph-checkpoint-mongodb"; import { MongoDBAtlasVectorSearch } from "@langchain/mongodb"; import { MongoClient } from "mongodb"; import { z } from "zod"; import "dotenv/config"; export async function callAgent(client: MongoClient, query: string, thread_id: string) { // Define the MongoDB database and collection const dbName = "hr_database"; const db = client.db(dbName); const collection = db.collection("employees"); // ... (Add rest of code here) }
定义工具。
添加以下代码以定义一个工具和工具节点,该工具和工具节点使用MongoDB Vector Search 通过查询向量存储来检索相关员工信息:
const executeQuery = async (embedding:[], n: number) => { try { const client = new MongoClient(process.env.MONGODB_URI as string); const database = client.db("hr_database"); const coll = database.collection("employees"); const agg = [ { '$vectorSearch': { 'index': 'vector_index', 'path': 'embedding', 'queryVector': embedding, 'numCandidates': 150, 'limit': n } }, { '$project': { '_id': 0, 'pageContent': 1, 'score': { '$meta': 'vectorSearchScore' } } } ]; const result = await coll.aggregate(agg).toArray(); return result } catch(error) { console.log("Error while querying:", error) } } const fetchEmbeddings = async (query: string) => { const apiUrl = "https://api.voyageai.com/v1/embeddings"; const apiKey = process.env.VOYAGEAI_API_KEY; const requestBody = { input: query, model: "voyage-3.5", }; try { const response = await fetch(apiUrl, { method: "POST", headers: { Authorization: `Bearer ${apiKey}`, "Content-Type": "application/json", }, body: JSON.stringify(requestBody), }); if (!response.ok) { throw new Error(`Error: ${response.status} ${response.statusText}`); } const data = await response.json(); return data.data[0].embedding; } catch (error) { console.error("Error while fetching embedding:", error); } }; const employeeLookupTool = tool( async ({ query, n = 10 }) => { console.log("Employee lookup tool called"); const embedding = await fetchEmbeddings(query) const response = await executeQuery(embedding, n) const result = JSON.stringify(response) return result; }, { name: "employee_lookup", description: "Gathers employee details from the HR database", schema: z.object({ query: z.string().describe("The search query"), n: z.number().optional().default(10).describe("Number of results to return"), }), } ); const tools = [employeeLookupTool]; const toolNode = new ToolNode<typeof GraphState.State>(tools);
定义附加函数。
添加以下代码以定义代理用于处理消息并确定是否继续对话的函数:
此函数可配置代理使用 LLM 的方式:
使用系统指令和对话历史记录构建提示模板。
使用当前时间、可用工具和消息设置提示格式。
调用 LLM 生成下一个响应。
返回要添加到对话状态的模型响应。
async function callModel(state: typeof GraphState.State) { const prompt = ChatPromptTemplate.fromMessages([ [ "system", `You are a helpful AI assistant, collaborating with other assistants. Use the provided tools to progress towards answering the question. If you are unable to fully answer, that's OK, another assistant with different tools will help where you left off. Execute what you can to make progress. If you or any of the other assistants have the final answer or deliverable, prefix your response with FINAL ANSWER so the team knows to stop. You have access to the following tools: {tool_names}.\n{system_message}\nCurrent time: {time}.`, ], new MessagesPlaceholder("messages"), ]); const formattedPrompt = await prompt.formatMessages({ system_message: "You are helpful HR Chatbot Agent.", time: new Date().toISOString(), tool_names: tools.map((tool) => tool.name).join(", "), messages: state.messages, }); const result = await model.invoke(formattedPrompt); return { messages: [result] }; } 此函数确定代理是否应继续或结束对话:
如果消息包含工具调用,则将流程路由到工具节点。
否则,结束对话并返回最终响应。
function shouldContinue(state: typeof GraphState.State) { const messages = state.messages; const lastMessage = messages[messages.length - 1] as AIMessage; if (lastMessage.tool_calls?.length) { return "tools"; } return "__end__"; }
定义代理的工作流程。
添加以下代码以定义代理响应查询时所采取步骤的顺序。
const workflow = new StateGraph(GraphState) .addNode("agent", callModel) .addNode("tools", toolNode) .addEdge("__start__", "agent") .addConditionalEdges("agent", shouldContinue) .addEdge("tools", "agent");
具体来说,代理执行以下步骤:
代理接收用户查询。
在代理节点中,代理处理查询并决定是使用工具还是结束对话。
如果需要工具,代理会路由到工具节点,在那里执行选定的工具。工具的结果会被发送回代理节点。
代理解释工具的输出,并形成响应或决定下一步行动。
此过程将一直持续,直到代理确定不需要进一步操作(
shouldContinue
函数返回end
)。
对于此代理,您将定义两个自定义节点:
代理节点:此节点处理当前状态下的消息,使用这些消息调用 LLM,并使用 LLM 的响应(包括任何工具调用)更新状态。
工具节点:该节点处理工具调用,根据当前状态确定要使用的适当工具,并使用工具调用的结果更新对话历史记录。
您还可以定义边缘以连接图表中的节点,并定义代理的流程。在此代码中,您将定义以下边缘:
以下常规边缘路由:
起始节点到代理节点。
代理节点到工具节点。
根据代理节点的输出路由流的条件边。如果代理节点确定需要某个工具,则会路由到工具节点。否则,它会结束对话。
为代理添加内存
为了提高代理的性能,您可以通过使用 MongoDB Checkpointer 来持久化其状态。持久性使代理能够存储先前交互的信息,代理可以在未来的交互中使用这些信息,提供更具上下文相关性的响应。
完成代理函数。
最后,添加以下代码以完成处理查询的代理函数:
const finalState = await app.invoke( { messages: [new HumanMessage(query)], }, { recursionLimit: 15, configurable: { thread_id: thread_id } } ); console.log(finalState.messages[finalState.messages.length - 1].content); return finalState.messages[finalState.messages.length - 1].content;
创建服务器并测试代理
在本节中,您将创建一个服务器以与您的代理进行交互并测试其功能。
配置Express.js服务器。
将您的 index.ts
文件替换为以下代码:
import 'dotenv/config'; import express, { Express, Request, Response } from "express"; import { MongoClient } from "mongodb"; import { callAgent } from './agent'; const app: Express = express(); app.use(express.json()); const client = new MongoClient(process.env.MONGODB_URI as string); async function startServer() { try { await client.connect(); await client.db("admin").command({ ping: 1 }); console.log("Pinged your deployment. You successfully connected to MongoDB!"); app.get('/', (req: Request, res: Response) => { res.send('LangGraph Agent Server'); }); app.post('/chat', async (req: Request, res: Response) => { const initialMessage = req.body.message; const threadId = Date.now().toString(); try { const response = await callAgent(client, initialMessage, threadId); res.json({ threadId, response }); } catch (error) { console.error('Error starting conversation:', error); res.status(500).json({ error: 'Internal server error' }); } }); app.post('/chat/:threadId', async (req: Request, res: Response) => { const { threadId } = req.params; const { message } = req.body; try { const response = await callAgent(client, message, threadId); res.json({ response }); } catch (error) { console.error('Error in chat:', error); res.status(500).json({ error: 'Internal server error' }); } }); const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Server running on port ${PORT}`); }); } catch (error) { console.error('Error connecting to MongoDB:', error); process.exit(1); } } startServer();
测试该代理。
发送示例请求以与您的代理进行交互。您的回答会因您使用的数据和模型而有所不同。
注意
该请求会返回 JSON 格式的响应。您还可以在服务器运行的终端中查看明文输出。
curl -X POST -H "Content-Type: application/json" -d '{"message": "Build a team to make a web app based on the employee data."}' http://localhost:3000/chat
# Sample response {"threadId": "1713589087654", "response": "To assemble a web app development team, we ideally need..." (truncated)} # Plaintext output in the terminal To assemble a web app development team, we ideally need the following roles: 1. **Software Developer**: To handle the coding and backend. 2. **UI/UX Designer**: To design the application's interface and user experience. 3. **Data Analyst**: For managing, analyzing, and visualizing data if required for the app. 4. **Project Manager**: To coordinate the project tasks and milestones, often providing communication across departments. ### Suitable Team Members for the Project: #### 1. Software Developer - **John Doe** - **Role**: Software Engineer - **Skills**: Java, Python, AWS - **Location**: Los Angeles HQ (Remote) - **Notes**: Highly skilled developer with exceptional reviews (4.8/5), promoted to Senior Engineer in 2018. #### 2. Data Analyst - **David Smith** - **Role**: Data Analyst - **Skills**: SQL, Tableau, Data Visualization - **Location**: Denver Office - **Notes**: Strong technical analysis skills. Can assist with app data integration or dashboards. #### 3. UI/UX Designer No specific UI/UX designer was identified in the current search. I will need to query this again or look for a graphic designer with some UI/UX skills. #### 4. Project Manager - **Emily Davis** - **Role**: HR Manager - **Skills**: Employee Relations, Recruitment, Conflict Resolution - **Location**: Seattle HQ (Remote) - **Notes**: Experienced in leadership. Can take on project coordination. Should I search further for a UI/UX designer, or do you have any other parameters for the team?
您可以使用先前响应中返回的线程 ID 继续对话。例如,如要提出后续问题,请使用以下命令。将 <threadId>
替换为先前响应中返回的线程 ID。
curl -X POST -H "Content-Type: application/json" -d '{"message": "Who should lead this project?"}' http://localhost:3000/chat/<threadId>
# Sample response {"response": "For leading this project, a suitable choice would be someone..." (truncated)} # Plaintext output in the terminal ### Best Candidate for Leadership: - **Emily Davis**: - **Role**: HR Manager - **Skills**: Employee Relations, Recruitment, Conflict Resolution - **Experience**: - Demonstrated leadership in complex situations, as evidenced by strong performance reviews (4.7/5). - Mentored junior associates, indicating capability in guiding a team. - **Advantages**: - Remote-friendly, enabling flexible communication across team locations. - Experience in managing people and processes, which would be crucial for coordinating a diverse team. **Recommendation:** Emily Davis is the best candidate to lead the project given her proven leadership skills and ability to manage collaboration effectively. Let me know if you'd like me to prepare a structured proposal or explore alternative options.