您可以将MongoDB Vector Search 与 LangChain 集成,以构建生成式AI和 RAG 应用程序。本页概述了 LangChain MongoDB Python集成以及您可以在应用程序中使用的不同组件。
注意
有关组件和方法的完整列表,请参阅 API参考。
有关 JavaScript 集成,请参阅 LangChain JS/TS。
安装和设置
要将MongoDB Vector Search 与 LangChain 结合使用,您必须首先安装 langchain-mongodb
包:
pip install langchain-mongodb
向量存储
MongoDBAtlasVectorSearch
是一个向量存储,允许您在MongoDB中的集合中存储和检索向量检索。您可以使用此组件存储数据中的嵌入,并使用MongoDB Vector Search 进行检索。
此组件需要一个MongoDB Vector Search 索引。
使用
实例化向量存储的最快方法是使用MongoDB 集群或本地部署的连接字符串:
from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch from langchain_voyageai import VoyageAIEmbeddings # Instantiate the vector store using your MongoDB connection string vector_store = MongoDBAtlasVectorSearch.from_connection_string( connection_string = "<connection-string>", # MongoDB cluster URI namespace = "<database-name>.<collection-name>", # Database and collection name embedding = VoyageAIEmbeddings(), # Embedding model to use index_name = "vector_index", # Name of the vector search index # Other optional parameters... )
该集成还支持其他方法来实例化矢量存储:
使用 MongoDB 客户端:
from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch from langchain_voyageai import VoyageAIEmbeddings from pymongo import MongoClient # Connect to your MongoDB cluster client = MongoClient("<connection-string>") collection = client["<database-name>"]["<collection-name>"] # Instantiate the vector store vector_store = MongoDBAtlasVectorSearch( collection = collection, # Collection to store embeddings embedding = VoyageAIEmbeddings(), # Embedding model to use index_name = "vector_index" # Name of the vector search index # Other optional parameters... ) 从您创建的文档中:
from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch from langchain_voyageai import VoyageAIEmbeddings from langchain_core.documents import Document from pymongo import MongoClient # Some documents to embed document_1 = Document(page_content="foo", metadata={"baz": "bar"}) document_2 = Document(page_content="thud", metadata={"bar": "baz"}) docs = [ document_1, document_2 ] # Connect to your MongoDB cluster client = MongoClient("<connection-string>") collection = client["<database-name>"]["<collection-name>"] # Create the vector store from documents vector_store = MongoDBAtlasVectorSearch.from_documents( documents = docs, # List of documents to embed embedding = VoyageAIEmbeddings(), # Embedding model to use collection = collection, # Collection to store embeddings index_name = "vector_index" # Name of the vector search index # Other optional parameters... )
Parameter | 必要性 | 说明 |
---|---|---|
| 必需 | |
| 必需 | 指定用于存储向量嵌入的 MongoDB 命名空间。 例如: |
| 必需 | 要使用的嵌入模型。您可以使用 LangChain 支持的任何嵌入模型。 |
| Optional | MongoDB Vector Search索引的名称。默认为 |
| Optional | 包含文档文本内容的字段名称。默认值为 |
| Optional | 存储嵌入向量的字段名称。默认值为 |
| Optional | 要使用的相似性函数。接受的值为 |
| Optional | 向量维度数。如果设立此值并且集合上没有向量搜索索引,则MongoDB会为您创建索引。 |
| Optional | 用于确定是否在向量索引不存在时自动创建向量索引的标志。默认值为 |
| Optional | 等待自动创建的向量搜索索引准备就绪的超时时间(以秒为单位)。 |
| Optional | 传递给向量存储的附加参数,例如 LangChain 特定的参数。 |
Retrievers
LangChain 检索器 是用于从向量存储中获取相关文档的组件。您可以使用 LangChain 的内置检索器或以下MongoDB检索器从MongoDB查询和检索数据。
Vector Search Retriever
将MongoDB实例化为向量存储后,您可以使用向量存储实例作为检索器,通过MongoDB Vector Search查询数据。
使用
from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch # Instantiate the vector store vector_store = MongoDBAtlasVectorSearch.from_connection_string( # Vector store arguments... ) # Use the vector store as a retriever retriever = vector_store.as_retriever() # Define your query query = "some search query" # Print results documents = retriever.invoke(query) for doc in documents: print(doc)
全文检索器
MongoDBAtlasFullTextSearchRetriever
是使用MongoDB Search 执行全文搜索的检索器。具体来说,它使用 Lucene 的标准 BM25 算法。
使用
from langchain_mongodb.retrievers.full_text_search import MongoDBAtlasFullTextSearchRetriever # Connect to your MongoDB cluster client = MongoClient("<connection-string>") collection = client["<database-name>"]["<collection-name>"] # Initialize the retriever retriever = MongoDBAtlasFullTextSearchRetriever( collection = collection, # MongoDB Collection in Atlas search_field = "<field-name>", # Name of the field to search search_index_name = "<index-name>" # Name of the search index ) # Define your query query = "some search query" # Print results documents = retriever.invoke(query) for doc in documents: print(doc)
注意
混合搜索检索器
MongoDBAtlasHybridSearchRetriever
是使用倒数排名融合 (RRF) 算法将向量搜索和全文搜索结果相结合的检索器。如要了解更多信息,请参阅如何执行混合搜索。
此检索器需要现有的向量存储、 MongoDB Vector Search Index 和MongoDB Search Index。
使用
from langchain_mongodb.retrievers.hybrid_search import MongoDBAtlasHybridSearchRetriever # Initialize the retriever retriever = MongoDBAtlasHybridSearchRetriever( vectorstore = <vector-store>, # Vector store instance search_index_name = "<index-name>", # Name of the MongoDB Search index top_k = 5, # Number of documents to return fulltext_penalty = 60.0, # Penalty for full-text search vector_penalty = 60.0 # Penalty for vector search ) # Define your query query = "some search query" # Print results documents = retriever.invoke(query) for doc in documents: print(doc)
Parent Document Retriever
MongoDBAtlasParentDocumentRetriever
是一个检索器,它首先查询较小的数据块,然后将较大的父文档返回给 LLM。这种检索类型称为父文档检索。父文档检索可以通过允许对较小的数据块进行更精细的搜索,同时为 LLM 提供父文档的完整上下文,从而改善 RAG 代理和应用程序的响应。
此检索器将父文档和子文档存储在单个 MongoDB 集合中,这支持通过仅计算和索引子文档的嵌入来实现高效检索。
在后台,该检索器会创建如下内容:
MongoDBAtlasVectorSearch 的一个实例,用于处理对子文档的向量搜索查询。
MongoDBDocStore 的一个实例,用于处理父文档的存储和检索。
使用
from langchain_mongodb.retrievers.parent_document import ParentDocumentRetriever from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_voyageai import VoyageAIEmbeddings retriever = MongoDBAtlasParentDocumentRetriever.from_connection_string( connection_string = <connection-string>, # MongoDB cluster URI embedding_model = VoyageAIEmbeddings(), # Embedding model to use child_splitter = RecursiveCharacterTextSplitter(), # Text splitter to use database_name = <database-name>, # Database to store the collection collection_name = <collection-name>, # Collection to store the collection # Additional vector store or parent class arguments... ) # Define your query query = "some search query" # Print results documents = retriever.invoke(query) for doc in documents: print(doc)
自查询检索器
MongoDBAtlasSelfQueryRetriever
是一个查询自身的检索器。检索器使用 LLM进程搜索查询,以确定可能的元数据筛选器,使用筛选器形成结构化向量搜索查询,然后运行该查询以检索最相关的文档。
示例,对于类似这样的查询:“What aretrunk movie from after 2010 with ratings 以上 8?”,检索器可以识别针对 genre
、year
和 rating
字段的筛选器,并使用这些筛选器用于检索与查询匹配的文档的筛选器。
此检索器需要现有的向量存储和MongoDB Vector Search Index。
使用
from langchain_mongodb.retrievers import MongoDBAtlasSelfQueryRetriever from langchain_mongodb import MongoDBAtlasVectorSearch # Given an existing vector store with movies data, define metadata describing the data metadata_field_info = [ AttributeInfo( name="genre", description="The genre of the movie. One of ['science fiction', 'comedy', 'drama', 'thriller', 'romance', 'animated']", type="string", ), AttributeInfo( name="year", description="The year the movie was released", type="integer", ), AttributeInfo( name="rating", description="A 1-10 rating for the movie", type="float" ), ] # Create the retriever from the VectorStore, an LLM and info about the documents retriever = MongoDBAtlasSelfQueryRetriever.from_llm( llm=llm, vectorstore=vector_store, metadata_field_info=metadata_field_info, document_contents="Descriptions of movies", enable_limit=True ) # This example results in the following composite filter sent to $vectorSearch: # {'filter': {'$and': [{'year': {'$lt': 1960}}, {'rating': {'$gt': 8}}]}} print(retriever.invoke("Movies made before 1960 that are rated higher than 8"))
GraphRAG
GraphRAG 是传统 RAG 的替代方法,它将数据结构化为实体及其关系的知识图谱,而不是向量嵌入。基于向量的 RAG 查找在语义上与查询相似的文档,而 GraphRAG 查找与查询相连的实体,并遍历图中的关系以检索相关信息。
这种方法尤其适用于回答基于关系的问题,例如“公司 A 和公司 B 之间有什么联系?”或“谁是 X 先生/女士的经理?”。
MongoDBGraphStore
是 LangChain MongoDB 集成中的一个组件,它允许您通过将实体(节点)及其关系(边)存储在 MongoDB 集合中来实现 GraphRAG。该组件将每个实体存储为一个文档,其中包含引用您集合中其他文档的关系字段。它使用 $graphLookup
聚合阶段执行查询。
使用
from langchain_mongodb import MongoDBGraphStore from langchain_openai import ChatOpenAI # Initialize the graph store graph_store = MongoDBGraphStore( connection_string = "<connection-string>", # MongoDB cluster URI database_name = "<database-name>", # Database to store the graph collection_name = "<collection-name>", # Collection to store the graph entity_extraction_model = ChatOpenAI(), # LLM to extract entities from documents (e.g. OpenAI model) # Other optional parameters... ) # Add documents to the graph docs = [...] # Your documents graph_store.add_documents(docs) # Query the graph query = "Who is the CEO of MongoDB?" answer = graph_store.chat_response(query) print(answer.content)
LLM 缓存
缓存用于存储类似或重复查询的重复响应以避免重新计算,从而优化 LLM 性能。MongoDB为 LangChain 应用程序提供以下缓存。
MongoDB 缓存
MongoDBCache
允许您在MongoDB集合中存储基本缓存。
使用
from langchain_mongodb import MongoDBCache from langchain_core.globals import set_llm_cache set_llm_cache(MongoDBCache( connection_string = "<connection-string>", # MongoDB cluster URI database_name = "<database-name>", # Database to store the cache collection_name = "<collection-name>" # Collection to store the cache ))
语义缓存
语义缓存是一种更高级的缓存形式,它根据用户输入和缓存结果之间的语义相似性检索缓存的提示。
MongoDBAtlasSemanticCache
是一个语义缓存,它使用MongoDB Vector Search 来检索缓存的提示。此组件需要MongoDB Vector Search索引。
使用
from langchain_mongodb import MongoDBAtlasSemanticCache from langchain_core.globals import set_llm_cache from langchain_voyageai import VoyageAIEmbeddings set_llm_cache(MongoDBAtlasSemanticCache( embedding = VoyageAIEmbeddings(), # Embedding model to use connection_string = "<connection-string>", # MongoDB cluster URI database_name = "<database-name>", # Database to store the cache collection_name = "<collection-name>" # Collection to store the cache ))
MongoDB 助手工具包
MongoDB 助手工具包 是一个工具集合,您可以将其传递给 LangGraph React助手 以便它可以与MongoDB资源交互。
可用工具
名称 | 说明 |
---|---|
| 一种用于查询MongoDB 数据库的工具。 |
| 一种用于获取MongoDB 数据库元数据的工具。 |
| 用于获取MongoDB数据库集合名称的工具。 |
| 调用 LLM 来检查数据库查询是否正确的工具。 |
使用
from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent from langchain_mongodb.agent_toolkit import ( MONGODB_AGENT_SYSTEM_PROMPT, MongoDBDatabase, MongoDBDatabaseToolkit, ) db_wrapper = MongoDBDatabase.from_connection_string( CONNECTION_STRING, database=DB_NAME ) llm = ChatOpenAI(model="gpt-4o-mini", timeout=60) toolkit = MongoDBDatabaseToolkit(db=db_wrapper, llm=llm) system_message = MONGODB_AGENT_SYSTEM_PROMPT.format(top_k=5) test_query = "Which country's customers spent the most?" agent = create_react_agent(llm, toolkit.get_tools(), state_modifier=system_message) agent.step_timeout = 60 events = agent.stream( {"messages": [("user", test_query)]}, stream_mode="values", ) messages = []
注意
文档加载器
文档加载器 是帮助您为 LangChain 应用程序加载数据的工具。
MongoDBLoader
是一个文档加载器,可从 MongoDB 数据库返回文档列表。
使用
from langchain_mongodb.loaders import MongoDBLoader loader = MongoDBLoader.from_connection_string( connection_string = "<connection-string>", # MongoDB cluster URI db_name = "<database-name>", # Database that contains the collection collection_name = "<collection-name>", # Collection to load documents from filter_criteria = { "field": "value" }, # Optional document to specify a filter field_names = ["<field-name>", "..." ], # Optional list of fields to include in document content metadata_names = ["<metadata-field>", "..."] # Optional metadata fields to extract ) docs = loader.load()
注意
聊天记录
MongoDBChatMessageHistory
是一个允许您在 MongoDB 数据库中存储和管理聊天消息历史记录的组件。它可以保存与唯一会话标识符关联的用户和 AI 生成的消息。这对于需要跟踪一段时间内交互的应用程序(例如聊天机器人)非常有用。
使用
from langchain_mongodb.chat_message_histories import MongoDBChatMessageHistory chat_message_history = MongoDBChatMessageHistory( session_id = "<session-id>", # Unique session identifier connection_string = "<connection-string>", # MongoDB cluster URI database_name = "<database-name>", # Database to store the chat history collection_name = "<collection-name>" # Collection to store the chat history ) chat_message_history.add_user_message("Hello") chat_message_history.add_ai_message("Hi")
chat_message_history.messages
[HumanMessage(content='Hello'), AIMessage(content='Hi')]
存储
您可以使用以下自定义数据存储来管理和存储在 MongoDB 中的数据。
文档存储
MongoDBDocStore
是一个自定义键值存储,使用 MongoDB 存储和管理文档。您可以执行 CRUD 操作,就像在任何其他 MongoDB 集合上执行一样。
使用
from pymongo import MongoClient from langchain_mongodb.docstores import MongoDBDocStore # Replace with your MongoDB connection string and namespace connection_string = "<connection-string>" namespace = "<database-name>.<collection-name>" # Initialize the MongoDBDocStore docstore = MongoDBDocStore.from_connection_string(connection_string, namespace)
注意
二进制存储
MongoDBByteStore
是一个自定义数据存储,它使用MongoDB存储和管理二进制数据,特别是以字节表示的数据。 您可以使用键值对执行CRUD操作,其中键是字符串,值是字节序列。
使用
from langchain.storage import MongoDBByteStore # Instantiate the MongoDBByteStore mongodb_store = MongoDBByteStore( connection_string = "<connection-string>", # MongoDB cluster URI db_name = "<database-name>", # Name of the database collection_name = "<collection-name>" # Name of the collection ) # Set values for keys mongodb_store.mset([("key1", b"hello"), ("key2", b"world")]) # Get values for keys values = mongodb_store.mget(["key1", "key2"]) print(values) # Output: [b'hello', b'world'] # Iterate over keys for key in mongodb_store.yield_keys(): print(key) # Output: key1, key2 # Delete keys mongodb_store.mdelete(["key1", "key2"])
注意
其他资源
要学习;了解如何将MongoDB Vector Search 与 LangGraph 集成,请参阅将MongoDB与 LangGraph 集成。
有关交互式Python笔记本,请参阅Docs Notebooks 存储库和生成式AI使用案例存储库。