Puedes integrar MongoDB Atlas con LangGraph para crear agentes de IA. Este tutorial muestra cómo crear un agente de IA que responde preguntas sobre datos de muestra en MongoDB.
Específicamente, el agente utiliza la integración para implementar RAG agéntico y memoria de agente. Utiliza búsqueda semántica y herramientas de búsqueda de texto completo para recuperar información relevante y responder preguntas sobre los datos. También implementa memoria tanto a corto como a largo plazo utilizando MongoDB, almacenando el historial de conversaciones y las interacciones importantes en colecciones separadas.
El código en esta página compila una aplicación de ejemplo completa. También puedes trabajar con el código como un Python notebook Si prefieres aprender paso a paso.
Requisitos previos
Para completar este tutorial, debes tener lo siguiente:
Uno de los siguientes tipos de clúster de MongoDB:
Un clúster de Atlas ejecutando la versión 6.0.11 de MongoDB, 7.0.2, o posterior. Asegúrate de que tu La dirección IP está incluida en la lista de accesode tu proyecto Atlas.
Una implementación local de Atlas creada usando Atlas CLI. Para obtener más información, consulta Crear una Implementación local de Atlas.
Un clúster de MongoDB Community o Enterprise con Search y Vector Search instalados.
Una clave API de Voyage IA. Para aprender más, consulta Clave API y cliente de Python.
Una llave de API de OpenAI. Debes tener una cuenta de OpenAI con créditos disponibles para las solicitudes de API. Para obtener más información sobre cómo registrar una cuenta de OpenAI, consulta el sitio web de la API de OpenAI.
Nota
Consulta los requisitos del paquete langchain-voyageai para asegurar que esté utilizando una versión compatible de Python.
Configurar el entorno
Para configurar el entorno, complete los siguientes pasos:
Inicialice el proyecto e instale las dependencias.
Cree un nuevo directorio de proyecto y luego instale las dependencias necesarias:
mkdir langgraph-mongodb-ai-agent cd langgraph-mongodb-ai-agent pip install --quiet --upgrade python-dotenv langgraph langgraph-checkpoint-mongodb langgraph-store-mongodb langchain langchain-mongodb langchain-voyageai langchain-openai pymongo
Nota
Su proyecto utilizará la siguiente estructura:
langgraph-mongodb-ai-agent ├── .env ├── config.py ├── search-tools.py ├── memory-tools.py ├── agent.py ├── main.py
Establecer variables de entorno.
Crear un(a) .env archivo en su proyecto y especifique las siguientes variables. Sustituya los valores de los marcadores de posición por claves válidas de API y la cadena de conexión de su clúster de MongoDB.
VOYAGE_API_KEY = "<voyage-api-key>" OPENAI_API_KEY = "<openai-api-key>" MONGODB_URI = "<connection-string>"
Nota
Se debe sustituir <connection-string> por la cadena de conexión del clúster Atlas o de la implementación local de Atlas.
Su cadena de conexión debe usar el siguiente formato:
mongodb+srv://<db_username>:<db_password>@<clusterName>.<hostname>.mongodb.net
Para obtener más información, consulta Conectar a un clúster a través de bibliotecas de clientes.
Su cadena de conexión debe usar el siguiente formato:
mongodb://localhost:<port-number>/?directConnection=true
Para obtener más información, consulta Cadenas de conexión.
Utilice MongoDB como base de datos vectorial
Para configurar MongoDB como una base de datos vectorial para almacenamiento y recuperación, complete los siguientes pasos:
Cargue los datos de muestra.
Para este tutorial, utilizarás uno de nuestros conjuntos de datos de muestra como fuente de datos. Si no lo has hecho aún, completa los pasos para cargar datos de muestra en tu clúster de Atlas.
En concreto, utilizarás el conjunto de datos embedded_movies, que contiene documentos sobre películas, incluidos los vectores de incrustación de sus tramas.
Nota
Si desea usar sus propios datos, consulte LangChain Get Started o Cómo crear incrustaciones vectoriales para aprender cómo introducir incrustaciones vectoriales en Atlas.
Configura el almacén de vectores y los índices.
Cree un archivo llamado config.py en su proyecto. Este archivo configura MongoDB como almacén de vectores para su agente. También crea los índices para habilitar la búsqueda de vectores y las consultas de texto completo en los datos de muestra.
import os from pymongo import MongoClient from langchain_mongodb import MongoDBAtlasVectorSearch from langchain_mongodb.index import create_fulltext_search_index from langchain_voyageai import VoyageAIEmbeddings from langchain_openai import ChatOpenAI from dotenv import load_dotenv # Load environment variables load_dotenv() # Get required environment variables MONGODB_URI = os.getenv("MONGODB_URI") if not MONGODB_URI: raise ValueError("MONGODB_URI environment variable is required") # Initialize models embedding_model = VoyageAIEmbeddings( model="voyage-3-large", output_dimension=2048 ) llm = ChatOpenAI("gpt-4o") # MongoDB setup mongo_client = MongoClient(MONGODB_URI) collection = mongo_client["sample_mflix"]["embedded_movies"] # LangChain vector store setup vector_store = MongoDBAtlasVectorSearch.from_connection_string( connection_string=MONGODB_URI, namespace="sample_mflix.embedded_movies", embedding=embedding_model, text_key="plot", embedding_key="plot_embedding_voyage_3_large", relevance_score_fn="dotProduct", ) # Create indexes on startup print("Setting up vector store and indexes...") try: existing_indexes = list(collection.list_search_indexes()) vector_index_exists = any(idx.get('name') == 'vector_index' for idx in existing_indexes) if vector_index_exists: print("Vector search index already exists, skipping creation...") else: print("Creating vector search index...") vector_store.create_vector_search_index( dimensions=2048, # The dimensions of the vector embeddings to be indexed wait_until_complete=60 # Number of seconds to wait for the index to build (can take around a minute) ) print("Vector search index created successfully!") except Exception as e: print(f"Error creating vector search index: {e}") try: fulltext_index_exists = any(idx.get('name') == 'search_index' for idx in existing_indexes) if fulltext_index_exists: print("Search index already exists, skipping creation...") else: print("Creating search index...") create_fulltext_search_index( collection=collection, field="title", index_name="search_index", wait_until_complete=60 # Number of seconds to wait for the index to build (can take around a minute) ) print("Search index created successfully!") except Exception as e: print(f"Error creating search index: {e}")
Define herramientas de búsqueda
Crea un archivo search_tools.py en tu proyecto. En este archivo, se definen las herramientas de búsqueda que el agente utiliza para realizar agentic RAG.
plot_searchEsta herramienta utiliza el objeto del almacén de vectores como recuperador. En segundo plano, el sistema recuperador ejecuta una MongoDB Vector Search query para recuperar documentos semánticamente similares. La herramienta devuelve los títulos y argumentos de los documentos de películas recuperados.title_searchEsta herramienta utiliza el recuperador de texto completo para recuperar documentos de películas que coinciden con el título especificado. A continuación, devuelve la trama de la película especificada.
from langchain.agents import tool from langchain_mongodb.retrievers.full_text_search import MongoDBAtlasFullTextSearchRetriever from config import vector_store, collection def plot_search(user_query: str) -> str: """ Retrieve information on the movie's plot to answer a user query by using vector search. """ retriever = vector_store.as_retriever( search_type="similarity", search_kwargs={"k": 5} # Retrieve top 5 most similar documents ) results = retriever.invoke(user_query) # Concatenate the results into a string context = "\n\n".join([f"{doc.metadata['title']}: {doc.page_content}" for doc in results]) return context def title_search(user_query: str) -> str: """ Retrieve movie plot content based on the provided title by using full-text search. """ # Initialize the retriever retriever = MongoDBAtlasFullTextSearchRetriever( collection=collection, # MongoDB Collection search_field="title", # Name of the field to search search_index_name="search_index", # Name of the MongoDB Search index top_k=1, # Number of top results to return ) results = retriever.invoke(user_query) for doc in results: if doc: return doc.metadata["fullplot"] else: return "Movie not found" # List of search tools SEARCH_TOOLS = [ plot_search, title_search ]
Nota
Puede definir cualquier herramienta necesaria para realizar una tarea específica. También puede definir herramientas para otros métodos de recuperación, como la búsqueda híbrida o la recuperación de documentos principales.
Define Memory Tools
Crea un archivo memory_tools.py en tu proyecto. En este archivo, se definen las herramientas que el agente puede utilizar para almacenar y recuperar interacciones importantes a lo largo de sesiones, a fin de implementar memoria a largo plazo.
store_memory:Esta herramienta utiliza el almacén MongoDB de LangGraph para almacenar interacciones importantes en una colección MongoDB.retrieve_memory:Esta herramienta utiliza el almacén MongoDB de LangGraph para recuperar interacciones relevantes en función de la consulta mediante la búsqueda semántica.
from langchain.agents import tool from langgraph.store.mongodb import MongoDBStore, create_vector_index_config from config import embedding_model, MONGODB_URI # Vector search index configuration for memory collection index_config = create_vector_index_config( embed=embedding_model, dims=2048, relevance_score_fn="dotProduct", fields=["content"] ) def save_memory(content: str) -> str: """Save important information to memory.""" with MongoDBStore.from_conn_string( conn_string=MONGODB_URI, db_name="sample_mflix", collection_name="memories", index_config=index_config, auto_index_timeout=60 # Wait a minute for vector index creation ) as store: store.put( namespace=("user", "memories"), key=f"memory_{hash(content)}", value={"content": content} ) return f"Memory saved: {content}" def retrieve_memories(query: str) -> str: """Retrieve relevant memories based on a query.""" with MongoDBStore.from_conn_string( conn_string=MONGODB_URI, db_name="sample_mflix", collection_name="memories", index_config=index_config ) as store: results = store.search(("user", "memories"), query=query, limit=3) if results: memories = [result.value["content"] for result in results] return f"Retrieved memories:\n" + "\n".join(memories) return "No relevant memories found." MEMORY_TOOLS = [save_memory, retrieve_memories]
Compile el agente con persistencia
Crea un archivo agent.py en tu proyecto. En este archivo, se crea el grafo que gestiona el flujo de trabajo del agente. Este agente utiliza el componente MongoDB Checkpointer para implementar memoria a corto plazo, permitiendo múltiples conversaciones concurrentes con historiales separados.
El agente utiliza el siguiente flujo de trabajo para responder a las queries:
Inicio: El agente recibe una query.
Nodo de agente: El LLM vinculado a la herramienta analiza la query y determina si se necesitan herramientas.
Nodo de herramientas (si es necesario): ejecuta las herramientas de búsqueda o memoria adecuadas.
Final: El LLM genera una respuesta final utilizando la salida de las herramientas.
La implementación del agente consta de varios componentes:
LangGraphAgent: Clase principal del agente que orquesta el flujo de trabajobuild_graph:Construye el flujo de trabajo LangGraph y configura el puntero de controlMongoDBSaverpara la persistencia de la memoria a corto plazoagent_nodeDelbergador principal de decisiones que procesa mensajes y determina el uso de herramientastools_node:Ejecuta las herramientas solicitadas y devuelve resultadosroute_tools:Función de enrutamiento condicional que determina la dirección del flujo de trabajoexecute: Punto de entrada principal que acepta un parámetrothread_idpara el seguimiento de hilos de conversación
from typing import Annotated, Dict, List from typing_extensions import TypedDict from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import ToolMessage from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.checkpoint.mongodb import MongoDBSaver from config import llm, mongo_client from search_tools import SEARCH_TOOLS from memory_tools import MEMORY_TOOLS # Define the graph state class GraphState(TypedDict): messages: Annotated[list, add_messages] # Define the LangGraph agent class LangGraphAgent: def __init__(self): # Combine search tools with memory tools self.tools = SEARCH_TOOLS + MEMORY_TOOLS self.tools_by_name = {tool.name: tool for tool in self.tools} # Create prompt template self.prompt = ChatPromptTemplate.from_messages([ ( "system", "You are a helpful AI chatbot." " You are provided with tools to answer questions about movies." " Think step-by-step and use these tools to get the information required to answer the user query." " Do not re-run tools unless absolutely necessary." " If you are not able to get enough information using the tools, reply with I DON'T KNOW." " You have access to the following tools: {tool_names}." ), MessagesPlaceholder(variable_name="messages"), ]) # Provide the tool names to the prompt self.prompt = self.prompt.partial(tool_names=", ".join([tool.name for tool in self.tools])) # Prepare the LLM with tools bind_tools = llm.bind_tools(self.tools) self.llm_with_tools = self.prompt | bind_tools # Build the graph self.app = self._build_graph() def _build_graph(self): """Build and compile the LangGraph workflow.""" # Instantiate the graph graph = StateGraph(GraphState) # Add nodes graph.add_node("agent", self._agent_node) graph.add_node("tools", self._tools_node) # Add edges graph.add_edge(START, "agent") graph.add_edge("tools", "agent") # Add conditional edge graph.add_conditional_edges( "agent", self._route_tools, {"tools": "tools", END: END}, ) # Use the MongoDB checkpointer for short-term memory checkpointer = MongoDBSaver(mongo_client, db_name = "sample_mflix") return graph.compile(checkpointer=checkpointer) def _agent_node(self, state: GraphState) -> Dict[str, List]: """Agent node that processes messages and decides on tool usage.""" messages = state["messages"] result = self.llm_with_tools.invoke(messages) return {"messages": [result]} def _tools_node(self, state: GraphState) -> Dict[str, List]: """Tools node that executes the requested tools.""" result = [] messages = state["messages"] if not messages: return {"messages": result} last_message = messages[-1] if not hasattr(last_message, "tool_calls") or not last_message.tool_calls: return {"messages": result} tool_calls = last_message.tool_calls # Show which tools the agent chose to use tool_names = [tool_call["name"] for tool_call in tool_calls] print(f"🔧 Agent chose to use tool(s): {', '.join(tool_names)}") for tool_call in tool_calls: try: tool_name = tool_call["name"] tool_args = tool_call["args"] tool_id = tool_call["id"] print(f" → Executing {tool_name}") if tool_name not in self.tools_by_name: result.append(ToolMessage(content=f"Tool '{tool_name}' not found", tool_call_id=tool_id)) continue tool = self.tools_by_name[tool_name] observation = tool.invoke(tool_args) result.append(ToolMessage(content=str(observation), tool_call_id=tool_id)) except Exception as e: result.append(ToolMessage(content=f"Tool error: {str(e)}", tool_call_id=tool_id)) return {"messages": result} def _route_tools(self, state: GraphState): """ Uses a conditional_edge to route to the tools node if the last message has tool calls. Otherwise, route to the end. """ messages = state.get("messages", []) if len(messages) > 0: ai_message = messages[-1] else: raise ValueError(f"No messages found in input state to tool_edge: {state}") if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0: return "tools" return END def execute(self, user_input: str, thread_id: str) -> str: """Execute the graph with user input.""" input_data = {"messages": [("user", user_input)]} config = {"configurable": {"thread_id": thread_id}} outputs = list(self.app.stream(input_data, config)) # Get the final answer if outputs: final_output = outputs[-1] for _, value in final_output.items(): if "messages" in value and value["messages"]: return value["messages"][-1].content return "No response generated."
Este grafo incluye los siguientes componentes clave:
Estado del grafo: mantiene los datos compartidos en todo el flujo de trabajo, rastreando los mensajes del agente, incluyendo las consultas de los usuarios, las respuestas de LLM y los resultados de llamadas a herramientas.
Nodo agente: procesa mensajes, invoca el LLM y actualiza el estado con respuestas del LLM
Nodo de herramientas: procesa las llamadas a herramientas y actualiza el historial de conversaciones con los resultados.
Edges, que conectan nodos:
Aristas normales: Ruta desde el inicio hasta el nodo del agente y del agente al nodo de herramientas
Borde condicional: Rutas condicionales dependiendo de si se necesitan herramientas
Persistencia: Utiliza el checkpoint
MongoDBSaverpara guardar el estado de la conversación en hilos específicos, permitiendo memoria a corto plazo entre sesiones. Puede encontrar los datos del hilo en las coleccionescheckpointsycheckpoint_writes.
Tip
Para obtener más información acerca de la persistencia, la memoria a corto plazo y el verificador de puntos de control de MongoDB, consulta los siguientes recursos:
Ejecutar el Agente
Por último, se debe crear un archivo llamado main.py en el proyecto. Este archivo ejecuta el agente y permite interactuar con él.
from agent import LangGraphAgent from config import mongo_client def main(): """LangGraph and MongoDB agent with tools and memory.""" # Initialize agent (indexes are created during config import) agent = LangGraphAgent() thread_id = input("Enter a session ID: ").strip() print("Ask me about movies! Type 'quit' to exit.") try: while True: user_query = input("\nYour question: ").strip() if user_query.lower() == 'quit': break # Get response from agent answer = agent.execute(user_query, thread_id) print(f"\nAnswer: {answer}") finally: mongo_client.close() if __name__ == "__main__": main()
Guardar el proyecto y luego ejecutar el siguiente comando. Cuando se ejecute el agente:
El agente inicializa el almacén de vectores y crea los índices si estos aún no existen.
Puedes ingresar un ID de sesión para iniciar una nueva sesión o continuar una sesión existente. Cada sesión se guarda y siempre puedes retomar una conversación anterior.
Hacer preguntas sobre películas. El agente genera una respuesta basada en tus herramientas e interacciones previas.
La siguiente salida muestra un ejemplo de interacción:
python main.py
Creating vector search index... Vector search index created successfully! Creating search index... Search index created successfully! Enter a session ID: 123 Ask me about movies! Type 'quit' to exit. Your query: What are some movies that take place in the ocean? 🔧 Agent chose to use tool(s): plot_search → Executing plot_search Answer: Here are some movies that take place in the ocean: 1. **20,000 Leagues Under the Sea** - A marine biologist, his daughter, and a mysterious Captain Nemo explore the ocean aboard an incredible submarine. 2. **Deep Rising** - A group of armed hijackers board a luxury ocean liner in the South Pacific Ocean, only to fight man-eating, tentacled sea creatures. ... (truncated) Your query: What is the plot of the Titanic? 🔧 Agent chose to use tool(s): title_search → Executing title_search Answer: The plot of *Titanic* involves the romantic entanglements of two couples aboard the doomed ship's maiden voyage ... (truncated) Your query: What movies are like the movie I just mentioned? 🔧 Agent chose to use tool(s): plot_search → Executing plot_search Answer: Here are some movies similar to *Titanic*: 1. **The Poseidon Adventure** - A group of passengers struggles to survive when their ocean liner capsizes at sea. 2. **Pearl Harbor** - Focused on romance and friendship amidst the backdrop of a historical tragedy, following two best friends and their love lives during wartime. ... (truncated) Your query: I don't like sad movies. 🔧 Agent chose to use tool(s): save_memory → Executing save_memory Answer: Got it—I'll keep that in mind. Let me know if you'd like recommendations that focus more on uplifting or happy themes! (In different session) Enter a session ID: 456 Your query: Recommend me a movie based on what you know about me. 🔧 Agent chose to use tool(s): retrieve_memories → Executing retrieve_memories Answer: Based on what I know about you—you don't like sad movies—I'd recommend a fun, uplifting, or action-packed film. Would you be interested in a comedy, adventure, or family-friendly movie? Your query: Sure! 🔧 Agent chose to use tool(s): plot_search, plot_search, plot_search → Executing plot_search → Executing plot_search → Executing plot_search Answer: Here are some movie recommendations from various uplifting genres that suit your preferences: ### Comedy: 1. **Showtime** (2002): A spoof of buddy cop movies where two very different cops are forced to team up on a new reality-based TV cop show. It's packed with laughs and action! 2. **The Big Bus** (1976): A hilarious disaster film parody featuring a nuclear-powered bus going nonstop from New York to Denver, plagued by absurd disasters. ### Adventure: 1. **Journey to the Center of the Earth** (2008): A scientist, his nephew, and their mountain guide discover a fantastic and dangerous lost world at the earth's core. 2. **Jason and the Argonauts** (1963): One of the most legendary adventures in mythology, brought to life in this epic saga of good versus evil. ### Family-Friendly: 1. **The Incredibles** (2004): A family of undercover superheroes is forced into action to save the world while living in quiet suburban life. 2. **Mary Poppins** (1964): A magical nanny brings joy and transformation to a cold banker's unhappy family. 3. **Chitty Chitty Bang Bang** (1968): A whimsical adventure featuring an inventor, his magical car, and a rescue mission filled with fantasy.