对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

使用 LangGraph 和MongoDB Atlas构建AI助手

您可以将MongoDB Atlas与 LangGraph 集成以构建AI代理。本教程演示如何构建AI代理来回答有关MongoDB中示例数据的问题。

具体来说,代理使用该集成来实现代理 RAG代理内存。它使用语义搜索和全文搜索工具来检索相关信息并回答有关数据的问题。它还通过将对话历史记录和重要交互存储在单独的集合中,使用MongoDB实现短期和长期记忆。

本页上的代码构建了一个完整的示例应用程序。如果您想逐步学习;了解,也可以将代码作为Python笔记本来学习。

如要完成本教程,您必须具备以下条件:

  • 以下MongoDB 集群类型之一:

    • 运行MongoDB6.0.11 、7.0.2 或更高版本的Atlas 集群。确保您的IP解决包含在Atlas项目的访问权限列表中。

    • 使用Atlas CLI创建的本地Atlas部署。要学习;了解更多信息,请参阅创建本地Atlas部署。

    • 安装了Search 和 Vector Search的MongoDB Community或 Enterprise集群。

  • Voyage AI API密钥。要学习;了解更多信息,请参阅API密钥和Python客户端。

  • OpenAI API密钥。您必须拥有一个具有可用于API请求的积分的 OpenAI 帐户。要学习;了解有关注册 OpenAI 帐户的更多信息,请参阅 OpenAI API网站。

注意

检查 langchain-voyageai 包的要求,确保您使用兼容的Python版本。

要设置环境,请完成以下步骤:

1

创建一个新的项目目录,然后安装所需的依赖项:

mkdir langgraph-mongodb-ai-agent
cd langgraph-mongodb-ai-agent
pip install --quiet --upgrade python-dotenv langgraph "langgraph-checkpoint-mongodb>=0.4.0" "langgraph-store-mongodb>=0.3.0" langchain langchain-mongodb langchain-voyageai langchain-openai pymongo

注意

您的项目将使用以下结构:

langgraph-mongodb-ai-agent
├── .env
├── config.py
├── search-tools.py
├── memory-tools.py
├── agent.py
├── main.py
2

在项目中创建一个 .env文件并指定以下变量。将占位符值替换为有效的API密钥和MongoDB集群的连接字符串。

VOYAGE_API_KEY = "<voyage-api-key>"
OPENAI_API_KEY = "<openai-api-key>"
MONGODB_URI = "<connection-string>"

注意

<connection-string> 替换为您的 Atlas 集群或本地部署的连接字符串。

连接字符串应使用以下格式:

mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net

要学习;了解更多信息,请参阅通过客户端库连接到集群。

连接字符串应使用以下格式:

mongodb://localhost:<port-number>/?directConnection=true

要学习;了解更多信息,请参阅连接字符串。

要将MongoDB配置为向量数据库以进行存储和检索,请完成以下步骤:

1

在本教程中,您将使用我们的一个示例数据集作为数据源。如果还没有,请完成将示例数据加载到Atlas 集群的步骤。

具体而言,您将使用 embedded_movies 数据集,该数据集包含有关电影的文档,包括其情节的向量嵌入。

注意

如果您想使用自己的数据,请参阅LangChain 入门如何手动创建vector embeddings,学习如何将vector embeddings引入Atlas。

2

在项目中创建一个名为 config.py 的文件。此文件将MongoDB配置为代理的向量存储。它还创建索引以启用对示例数据的向量搜索和全文搜索查询。

将以下代码复制并粘贴到您的 config.py 文件中。

import os
from pymongo import MongoClient
from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_mongodb.index import create_fulltext_search_index
from langchain_voyageai import VoyageAIEmbeddings
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get required environment variables
MONGODB_URI = os.getenv("MONGODB_URI")
if not MONGODB_URI:
raise ValueError("MONGODB_URI environment variable is required")
# Initialize models
embedding_model = VoyageAIEmbeddings(
model="voyage-3-large",
output_dimension=2048
)
llm = ChatOpenAI("gpt-4o")
# MongoDB setup
mongo_client = MongoClient(MONGODB_URI)
collection = mongo_client["sample_mflix"]["embedded_movies"]
# LangChain vector store setup
vector_store = MongoDBAtlasVectorSearch.from_connection_string(
connection_string=MONGODB_URI,
namespace="sample_mflix.embedded_movies",
embedding=embedding_model,
text_key="plot",
embedding_key="plot_embedding_voyage_3_large",
relevance_score_fn="dotProduct",
)
# Create indexes on startup
print("Setting up vector store and indexes...")
try:
existing_indexes = list(collection.list_search_indexes())
vector_index_exists = any(idx.get('name') == 'vector_index' for idx in existing_indexes)
if vector_index_exists:
print("Vector search index already exists, skipping creation...")
else:
print("Creating vector search index...")
vector_store.create_vector_search_index(
dimensions=2048, # The dimensions of the vector embeddings to be indexed
wait_until_complete=60 # Number of seconds to wait for the index to build (can take around a minute)
)
print("Vector search index created successfully!")
except Exception as e:
print(f"Error creating vector search index: {e}")
try:
fulltext_index_exists = any(idx.get('name') == 'search_index' for idx in existing_indexes)
if fulltext_index_exists:
print("Search index already exists, skipping creation...")
else:
print("Creating search index...")
create_fulltext_search_index(
collection=collection,
field="title",
index_name="search_index",
wait_until_complete=60 # Number of seconds to wait for the index to build (can take around a minute)
)
print("Search index created successfully!")
except Exception as e:
print(f"Error creating search index: {e}")

在项目中创建 search_tools.py文件。在此文件中,您可以定义代理用于执行代理 RAG的搜索工具。

将以下代码复制并粘贴到 search_tools.py文件中。

  • plot_search:此工具使用向量存储对象作为检索器。在幕后,检索器运行MongoDB Vector Search查询来检索语义相似的文档。然后,该工具会返回检索到的电影文档的标题和情节。

  • title_search:此工具使用全文搜索检索器来检索与指定电影标题匹配的电影文档。然后,该工具会返回指定电影的剧情。

from langchain.agents import tool
from langchain_mongodb.retrievers.full_text_search import MongoDBAtlasFullTextSearchRetriever
from config import vector_store, collection
@tool
def plot_search(user_query: str) -> str:
"""
Retrieve information on the movie's plot to answer a user query by using vector search.
"""
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # Retrieve top 5 most similar documents
)
results = retriever.invoke(user_query)
# Concatenate the results into a string
context = "\n\n".join([f"{doc.metadata['title']}: {doc.page_content}" for doc in results])
return context
@tool
def title_search(user_query: str) -> str:
"""
Retrieve movie plot content based on the provided title by using full-text search.
"""
# Initialize the retriever
retriever = MongoDBAtlasFullTextSearchRetriever(
collection=collection, # MongoDB Collection
search_field="title", # Name of the field to search
search_index_name="search_index", # Name of the MongoDB Search index
top_k=1, # Number of top results to return
)
results = retriever.invoke(user_query)
for doc in results:
if doc:
return doc.metadata["fullplot"]
else:
return "Movie not found"
# List of search tools
SEARCH_TOOLS = [ plot_search, title_search ]

注意

您可以定义执行特定任务所需的任何工具。您还可以为混合搜索父文档检索等其他检索方法定义工具。

在项目中创建 memory_tools.py文件。在此文件中,您可以定义代理用于存储和检索跨会话的重要交互以实现长期记忆的工具。

将以下代码复制并粘贴到memory_tools.py文件中。

  • store_memory:此工具使用LangGraph MongoDB存储将重要的交互存储在MongoDB集合中。

  • retrieve_memory:此工具使用 LangGraph MongoDB存储,通过使用语义搜索检索基于查询的相关交互。

from langchain.agents import tool
from langgraph.store.mongodb import MongoDBStore, create_vector_index_config
from config import embedding_model, MONGODB_URI
# Vector search index configuration for memory collection
index_config = create_vector_index_config(
embed=embedding_model,
dims=2048,
relevance_score_fn="dotProduct",
fields=["content"]
)
@tool
def save_memory(content: str) -> str:
"""Save important information to memory."""
with MongoDBStore.from_conn_string(
conn_string=MONGODB_URI,
db_name="sample_mflix",
collection_name="memories",
index_config=index_config,
auto_index_timeout=60 # Wait a minute for vector index creation
) as store:
store.put(
namespace=("user", "memories"),
key=f"memory_{hash(content)}",
value={"content": content}
)
return f"Memory saved: {content}"
@tool
def retrieve_memories(query: str) -> str:
"""Retrieve relevant memories based on a query."""
with MongoDBStore.from_conn_string(
conn_string=MONGODB_URI,
db_name="sample_mflix",
collection_name="memories",
index_config=index_config
) as store:
results = store.search(("user", "memories"), query=query, limit=3)
if results:
memories = [result.value["content"] for result in results]
return f"Retrieved memories:\n" + "\n".join(memories)
return "No relevant memories found."
MEMORY_TOOLS = [save_memory, retrieve_memories]

在项目中创建 agent.py文件。在此文件中,您构建用于协调代理工作流程的图表。该代理使用MongoDB Checkpointer 组件来实现短期记忆,允许具有单独历史记录的多个并发对话。

该代理使用以下工作流程来响应查询:

  1. 启动:代理接收用户查询。

  2. 代理节点:工具绑定的 LLM 会分析查询并确定是否需要工具。

  3. 工具节点(如果需要):执行相应的搜索或内存工具。

  4. End:LLM 使用工具的输出生成最终响应。

展示 LangGraph-MongoDB 代理工作流程的图表。

将以下代码复制并粘贴到代理的.py文件中。

代理实施由多个组件组成:

  • LangGraphAgent:协调工作流程的主代理类

  • build_graph:构建 LangGraph 工作流程并配置 MongoDBSaver 检查指针以实现短期内存持久性

  • agent_node:处理消息并决定工具使用的主要决策者

  • tools_node:执行请求的工具并返回结果

  • route_tools:确定工作流程方向的条件路由函数

  • execute:接受用于对话线程跟踪的 thread_id 参数的主入口点

from typing import Annotated, Dict, List
from typing_extensions import TypedDict
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.mongodb import MongoDBSaver
from config import llm, mongo_client
from search_tools import SEARCH_TOOLS
from memory_tools import MEMORY_TOOLS
# Define the graph state
class GraphState(TypedDict):
messages: Annotated[list, add_messages]
# Define the LangGraph agent
class LangGraphAgent:
def __init__(self):
# Combine search tools with memory tools
self.tools = SEARCH_TOOLS + MEMORY_TOOLS
self.tools_by_name = {tool.name: tool for tool in self.tools}
# Create prompt template
self.prompt = ChatPromptTemplate.from_messages([
(
"system",
"You are a helpful AI chatbot."
" You are provided with tools to answer questions about movies."
" Think step-by-step and use these tools to get the information required to answer the user query."
" Do not re-run tools unless absolutely necessary."
" If you are not able to get enough information using the tools, reply with I DON'T KNOW."
" You have access to the following tools: {tool_names}."
),
MessagesPlaceholder(variable_name="messages"),
])
# Provide the tool names to the prompt
self.prompt = self.prompt.partial(tool_names=", ".join([tool.name for tool in self.tools]))
# Prepare the LLM with tools
bind_tools = llm.bind_tools(self.tools)
self.llm_with_tools = self.prompt | bind_tools
# Build the graph
self.app = self._build_graph()
def _build_graph(self):
"""Build and compile the LangGraph workflow."""
# Instantiate the graph
graph = StateGraph(GraphState)
# Add nodes
graph.add_node("agent", self._agent_node)
graph.add_node("tools", self._tools_node)
# Add edges
graph.add_edge(START, "agent")
graph.add_edge("tools", "agent")
# Add conditional edge
graph.add_conditional_edges(
"agent",
self._route_tools,
{"tools": "tools", END: END},
)
# Use the MongoDB checkpointer for short-term memory
checkpointer = MongoDBSaver(mongo_client, db_name = "sample_mflix")
return graph.compile(checkpointer=checkpointer)
def _agent_node(self, state: GraphState) -> Dict[str, List]:
"""Agent node that processes messages and decides on tool usage."""
messages = state["messages"]
result = self.llm_with_tools.invoke(messages)
return {"messages": [result]}
def _tools_node(self, state: GraphState) -> Dict[str, List]:
"""Tools node that executes the requested tools."""
result = []
messages = state["messages"]
if not messages:
return {"messages": result}
last_message = messages[-1]
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
return {"messages": result}
tool_calls = last_message.tool_calls
# Show which tools the agent chose to use
tool_names = [tool_call["name"] for tool_call in tool_calls]
print(f"🔧 Agent chose to use tool(s): {', '.join(tool_names)}")
for tool_call in tool_calls:
try:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
tool_id = tool_call["id"]
print(f" → Executing {tool_name}")
if tool_name not in self.tools_by_name:
result.append(ToolMessage(content=f"Tool '{tool_name}' not found", tool_call_id=tool_id))
continue
tool = self.tools_by_name[tool_name]
observation = tool.invoke(tool_args)
result.append(ToolMessage(content=str(observation), tool_call_id=tool_id))
except Exception as e:
result.append(ToolMessage(content=f"Tool error: {str(e)}", tool_call_id=tool_id))
return {"messages": result}
def _route_tools(self, state: GraphState):
"""
Uses a conditional_edge to route to the tools node if the last message
has tool calls. Otherwise, route to the end.
"""
messages = state.get("messages", [])
if len(messages) > 0:
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools"
return END
def execute(self, user_input: str, thread_id: str) -> str:
"""Execute the graph with user input."""
input_data = {"messages": [("user", user_input)]}
config = {"configurable": {"thread_id": thread_id}}
outputs = list(self.app.stream(input_data, config))
# Get the final answer
if outputs:
final_output = outputs[-1]
for _, value in final_output.items():
if "messages" in value and value["messages"]:
return value["messages"][-1].content
return "No response generated."

展开此部分,了解代理中使用的 LangGraph 组件的详细信息。

该图表包括以下关键部分:

  • Graph State:维护整个工作流程中的共享数据,跟踪代理的消息,包括用户查询、LLM响应和工具调用结果。

  • Nodes:

    • 代理节点:处理消息、调用 LLM 并使用 LLM 响应更新状态

    • 工具节点:处理工具调用并使用结果更新对话历史记录

  • ,用于连接节点:

    • 普通边:从开始到代理节点以及代理到工具节点的路由

    • 有条件的边缘:根据是否需要工具进行有条件的路由

  • 持久性:使用 MongoDBSaver 检查指针将会话状态保存到特定线程,从而启用跨会话的短期记忆。您可以在 checkpointscheckpoint_writes 集合中找到线程数据。

提示

要学习;了解有关持久性、短期记忆和MongoDB检查点的更多信息,请参阅以下资源:

最后,在项目中创建一个名为 main.py 的文件。此文件运行代理并允许您与其交互。

将以下代码复制并粘贴到 main.py文件中。

from agent import LangGraphAgent
from config import mongo_client
def main():
"""LangGraph and MongoDB agent with tools and memory."""
# Initialize agent (indexes are created during config import)
agent = LangGraphAgent()
thread_id = input("Enter a session ID: ").strip()
print("Ask me about movies! Type 'quit' to exit.")
try:
while True:
user_query = input("\nYour question: ").strip()
if user_query.lower() == 'quit':
break
# Get response from agent
answer = agent.execute(user_query, thread_id)
print(f"\nAnswer: {answer}")
finally:
mongo_client.close()
if __name__ == "__main__":
main()

保存项目,然后运行以下命令。当您运行代理时:

  • 代理会初始化向量存储并创建索引(如果尚不存在)。

  • 您可以输入会话ID来启动新会话或继续现有会话。每个会话都是持久的,您可以随时恢复之前的对话。

  • 询问有关电影的问题。该代理会根据您的工具和之前的交互生成响应。

以下输出演示了一个示例交互:

python main.py
Creating vector search index...
Vector search index created successfully!
Creating search index...
Search index created successfully!
Enter a session ID: 123
Ask me about movies! Type 'quit' to exit.
Your query: What are some movies that take place in the ocean?
🔧 Agent chose to use tool(s): plot_search
→ Executing plot_search
Answer: Here are some movies that take place in the ocean:
1. **20,000 Leagues Under the Sea** - A marine biologist, his daughter, and a mysterious Captain Nemo explore the ocean aboard an incredible submarine.
2. **Deep Rising** - A group of armed hijackers board a luxury ocean liner in the South Pacific Ocean, only to fight man-eating, tentacled sea creatures.
... (truncated)
Your query: What is the plot of the Titanic?
🔧 Agent chose to use tool(s): title_search
→ Executing title_search
Answer: The plot of *Titanic* involves the romantic entanglements of two couples aboard the doomed ship's maiden voyage
... (truncated)
Your query: What movies are like the movie I just mentioned?
🔧 Agent chose to use tool(s): plot_search
→ Executing plot_search
Answer: Here are some movies similar to *Titanic*:
1. **The Poseidon Adventure** - A group of passengers struggles to survive when their ocean liner capsizes at sea.
2. **Pearl Harbor** - Focused on romance and friendship amidst the backdrop of a historical tragedy, following two best friends and their love lives during wartime.
... (truncated)
Your query: I don't like sad movies.
🔧 Agent chose to use tool(s): save_memory
→ Executing save_memory
Answer: Got it—I'll keep that in mind. Let me know if you'd like recommendations that focus more on uplifting or happy themes!
(In different session)
Enter a session ID: 456
Your query: Recommend me a movie based on what you know about me.
🔧 Agent chose to use tool(s): retrieve_memories
→ Executing retrieve_memories
Answer: Based on what I know about you—you don't like sad movies—I'd recommend a fun, uplifting, or action-packed film. Would you be interested in a comedy, adventure, or family-friendly movie?
Your query: Sure!
🔧 Agent chose to use tool(s): plot_search, plot_search, plot_search
→ Executing plot_search
→ Executing plot_search
→ Executing plot_search
Answer: Here are some movie recommendations from various uplifting genres that suit your preferences:
### Comedy:
1. **Showtime** (2002): A spoof of buddy cop movies where two very different cops are forced to team up on a new reality-based TV cop show. It's packed with laughs and action!
2. **The Big Bus** (1976): A hilarious disaster film parody featuring a nuclear-powered bus going nonstop from New York to Denver, plagued by absurd disasters.
### Adventure:
1. **Journey to the Center of the Earth** (2008): A scientist, his nephew, and their mountain guide discover a fantastic and dangerous lost world at the earth's core.
2. **Jason and the Argonauts** (1963): One of the most legendary adventures in mythology, brought to life in this epic saga of good versus evil.
### Family-Friendly:
1. **The Incredibles** (2004): A family of undercover superheroes is forced into action to save the world while living in quiet suburban life.
2. **Mary Poppins** (1964): A magical nanny brings joy and transformation to a cold banker's unhappy family.
3. **Chitty Chitty Bang Bang** (1968): A whimsical adventure featuring an inventor, his magical car, and a rescue mission filled with fantasy.