How to Improve LLM Applications With Parent Document Retrieval Using MongoDB and LangChain
Rate this tutorial
Chunking, in the context of LLM applications, is the process of breaking down large pieces of text into smaller segments or chunks. Chunking is an important component of any LLM application that involves retrieving data from a knowledge base since it impacts the quality of everything downstream—from embeddings to retrieval, and the generation itself.
The main concern with chunking, however, is that you inevitably lose context in an attempt to keep chunks targeted and focused to maintain embedding quality. This can hurt generation quality since the information required to answer a particular question might get spread across multiple chunks.
This is where a technique called parent document retrieval can help. In this tutorial, we will see how this technique helps retain the benefits of chunking without impacting generation quality. Specifically, we will cover the following:
- What is parent document retrieval and when should you use it?
- How parent document retrieval works in MongoDB
- Implementing parent document retrieval using MongoDB’s LangChain integration
- Using parent document retrieval in retrieval augmented generation (RAG) and agentic workflows
When splitting documents for LLM applications, there are often conflicting considerations:
- Chunks should be small enough so that embeddings can accurately capture their meaning, resulting in good retrieval quality.
- Chunks should be large enough so as to not spread context across multiple chunks, resulting in good generation quality.
This is difficult to achieve using simple strategies that involve defining a single pre-defined chunk size, for example, fixed token with overlap or recursive with overlap. Parent document retrieval aims to strike a balance between the two requirements by embedding and storing small chunks, but identifying and fetching the source document or larger chunks at retrieval time.
The main advantage of this technique is that it provides more complete context to the LLM, resulting in more contextualized responses. Some use cases where context expansion can prove useful are as follows:
- Legal case preparation: Expanding a response about a termination clause with information about dispute resolution and governing law from the same document.
- Documentation chatbots: Answering a question on API authentication with information about token expiration and refresh mechanisms.
- Scientific research: A query about "results from experiment A" expands to include methods, hypotheses, and limitations.
In this tutorial, we will use MongoDB’s LangChain integration which provides a simple API for parent document retrieval, but let’s look at what happens under the hood.
At ingest time, documents are split into small chunks, embedded, and stored in a MongoDB collection. Each chunked document has a parent ID, which is a unique identifier for the parent document that the chunk came from. The parent documents are also stored in the same collection, with the
_id
field matching the parent ID contained in the corresponding document chunks. A visual representation of this process is as follows:
At retrieval time, the user query is embedded and the relevant chunks are retrieved using semantic search. A
$lookup
operation in MongoDB, akin to a left outer join, is performed to obtain the parent documents of the retrieved chunks from the same collection. The chunks themselves and any duplicate parent documents are then dropped, and unique parent documents are passed on to the LLM as context to answer the user query. All of this is achieved using MongoDB’s rich aggregation framework. A visual representation of the retrieval and generation process is as follows:
In this tutorial, we will implement parent document retrieval using MongoDB’s LangChain integration, and see how to use it in a RAG application as well as an AI agent. The Jupyter Notebook for this tutorial can be found on GitHub in our GenAI Showcase repository.
We will require the following libraries for this tutorial:
- datasets: Python package to download datasets from Hugging Face
- pymongo: Python driver for MongoDB
- langchain: Python package for LangChain's core modules
- langgraph: Python package to orchestrate LLM workflows as graphs
- langchain-mongodb: Python package to use MongoDB features in LangChain
- langchain-openai: Python package to use OpenAI models via LangChain
1 ! pip install -qU datasets pymongo langchain langgraph langchain-mongodb langchain-openai
We will use OpenAI as the embedding as well as chat completion model provider. To use their models, you need to obtain an OpenAI API key and set it as an environment variable:
1 os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API Key:")
We will use MongoDB for parent document retrieval. But first, you will need a MongoDB Atlas account with a database cluster. Once you do that, you will need to get the connection string to connect to your cluster. Follow these steps to get set up:
- Obtain the connection string for your database cluster.
Once you have the connection string, set it in your code, instantiate the MongoDB client, and ensure that you are able to connect to your database using the
ping
command.1 MONGODB_URI = getpass.getpass("Enter your MongoDB connection string:") 2 mongodb_client = MongoClient( 3 MONGODB_URI, appname="devrel.showcase.parent_doc_retrieval" 4 ) 5 mongodb_client.admin.command("ping")
We will use a snapshot of MongoDB’s official documentation as the dataset for our tutorial. This dataset is available on Hugging Face. To download this dataset, you will need to request access to it and create a user access token. Follow the steps here to get set up:
- Request access to the dataset. Requests are approved automatically so you should have access to the dataset instantaneously.
Once you have the access token, set it as an environment variable:
1 os.environ["HF_TOKEN"] = getpass.getpass("Enter your HF Access Token:")
First, let’s download the MongoDB Docs dataset from Hugging Face.
1 from datasets import load_dataset 2 import pandas as pd 3 4 data = load_dataset("mongodb-eai/docs", streaming=True, split="train") 5 data_head = data.take(1000) 6 df = pd.DataFrame(data_head)
We will download the dataset in streaming mode to only download a subset of the dataset instead of downloading the entire dataset to disk.
The easiest way to use your data with LangChain features is by converting them into LangChain document objects (we will refer to these as “documents” in this tutorial). These objects consist of two attributes—namely,
page_content
and metadata
. page_content
, as the name suggests, corresponds to the content of the document, and metadata
is basic information about the documents that you can customize or will be automatically extracted by LangChain.1 from langchain_core.documents import Document 2 3 docs = [] 4 metadata_fields = ["updated", "url", "title"] 5 for _, row in df.iterrows(): 6 content = row["body"] 7 metadata = row["metadata"] 8 for field in metadata_fields: 9 metadata[field] = row[field] 10 docs.append(Document(page_content=content, metadata=metadata))
In the above code, we iterate through the rows of our Docs dataset and create a LangChain document per row. From each row, we extract the
body
field as the page_content
of the document. We also extract metadata
and a few other fields such as url
, title
, etc. as the metadata
attribute of the document.An example of a LangChain document object is as follows:
1 Document(page_content='# View Database Access History\n\n- This feature is not available for `M0` free clusters, `M2`, and `M5` clusters. To learn more, see Atlas M0 (Free Cluster), M2, and M5 Limits', metadata={'contentType': None, 'pageDescription': None, 'productName': 'MongoDB Atlas', 'tags': ['atlas', 'docs'], 'version': None, 'updated': {'$date': '2024-05-20T17:30:49.148Z'}, 'url': 'https://mongodb.com/docs/atlas/access-tracking/', 'title': 'View Database Access History'})
Whenever a MongoDB parent document retriever is instantiated using the
from_connection_string
method, it automatically creates an instance of the MongoDBAtlasVectorSearch
vector store and the MongoDBDocStore
document store. When documents are added to the retriever, the MongoDB Atlas vector store splits them into chunks (child documents), generates embeddings for the chunks, and ingests them into a MongoDB collection. The MongoDB document store ingests the parent documents into the same collection.MongoDB Atlas is a unified platform for vector and operational data. This allows for the same collection to act as the vector and document store. In most other cases, you would need to use one platform as the vector store and another as the document store.
So let’s first specify the embedding model, the database, and the collection to ingest documents into, and define a helper function for chunking documents.
1 from langchain_mongodb.retrievers import ( 2 MongoDBAtlasParentDocumentRetriever, 3 ) 4 from langchain_text_splitters import RecursiveCharacterTextSplitter 5 from langchain_openai import OpenAIEmbeddings 6 7 embedding_model = OpenAIEmbeddings(model="text-embedding-3-small") 8 9 DB_NAME = "langchain" 10 COLLECTION_NAME = "parent_doc" 11 12 def get_splitter(chunk_size: int) -> RecursiveCharacterTextSplitter: 13 """ 14 Returns a token-based text splitter with overlap 15 16 Args: 17 chunk_size (_type_): Chunk size in number of tokens 18 19 Returns: 20 RecursiveCharacterTextSplitter: Recursive text splitter object 21 """ 22 return RecursiveCharacterTextSplitter.from_tiktoken_encoder( 23 encoding_name="cl100k_base", 24 chunk_size=chunk_size, 25 chunk_overlap=0.15 * chunk_size, 26 )
The above code:
- Initializes the embedding model. We are using OpenAI’s
text-embedding-3-small
. - Specifies the database (
DB_NAME
) and collection (COLLECTION_NAME
) to ingest data into. - Defines a function called
get_splitter
for chunking documents. The function takes achunk_size
as an argument and returns an object of theRecursiveCharacterTextSplitter
class. We use thefrom_tiktoken_encoder
method of the class, which means texts will first be split by a list of characters and then merged into tokens until the specifiedchunk_size
is reached. We also specify achunk_overlap
corresponding to 15% of thechunk_size
.
Now, let’s instantiate the MongoDB parent document retriever:
1 parent_doc_retriever = MongoDBAtlasParentDocumentRetriever.from_connection_string( 2 connection_string=MONGODB_URI, 3 embedding_model=embedding_model, 4 child_splitter=get_splitter(200), 5 database_name=DB_NAME, 6 collection_name=COLLECTION_NAME, 7 text_key="page_content", 8 search_kwargs={"k": 10}, 9 )
The above code uses the
from_connection_string
method with the following arguments to create an instance of MongoDBParentDocumentRetriever
:- connection_string: Connection string for your MongoDB Atlas cluster.
- embedding_model: Embedding model for the vector store. This was initialized previously.
- child_splitter: Uses the
get_splitter
function to create a text splitter for chunking documents according to the specified chunk size, in this case, 200 tokens. - database_name: The MongoDB database to ingest parent and child documents into.
- collection_name MongoDB collection to ingest parent and child documents into.
- text_key: The field in the chunked documents that contains the raw text. In our documents, it is
page_content
. - search_kwargs: Additional arguments for the search. We will set
k
to 10 to retrieve the top 10 most relevant chunks while performing the semantic search prior to parent document retrieval. - kwargs: Any additional arguments to the parent document retriever.
You can also pass
parent_splitter
as an additional argument to the from_connection_string
method. The idea here is to first split the raw documents into large chunks and then split them into smaller chunks. At retrieval time, instead of the full parent documents, the larger parent chunks are retrieved. You can instantiate a parent chunk retriever as follows:1 parent_chunk_retriever = MongoDBAtlasParentDocumentRetriever.from_connection_string( 2 connection_string=MONGODB_URI, 3 embedding_model=embedding_model, 4 child_splitter=get_splitter(200), 5 parent_splitter=get_splitter(800), 6 database_name=DB_NAME, 7 collection_name=COLLECTION_NAME, 8 text_key="page_content", 9 search_kwargs={"k": 10}, 10 )
In the above example, the retriever will create parent chunks of size 800 tokens and child chunks of size 200 tokens.
We will use the
parent_doc_retriever
for the rest of the tutorial.Now, let’s ingest documents into MongoDB using the retriever. We will asynchronously ingest documents into MongoDB—this is especially useful when working with large datasets since you can concurrently process multiple chunks of data, hence speeding up the data ingest.
Let’s define some helper functions for the data ingest.
1 import asyncio 2 from typing import Generator, List 3 4 BATCH_SIZE = 256 5 MAX_CONCURRENCY = 4 6 7 async def process_batch(batch: Generator, semaphore: asyncio.Semaphore) -> None: 8 """ 9 Ingest batches of documents into MongoDB 10 11 Args: 12 batch (Generator): Chunk of documents to ingest 13 semaphore (as): Asyncio semaphore 14 """ 15 async with semaphore: 16 await parent_doc_retriever.aadd_documents(batch) 17 print(f"Processed {len(batch)} documents")
The above code:
- Sets a batch size (
BATCH_SIZE
) that specifies the number of documents to process in a single task, and a concurrency limit (MAX_CONCURRENCY
) which indicates the maximum number of tasks that can run simultaneously. - Defines a function called
process_chunk
which runs a batch of documents through theparent_doc_retriever
using theaadd_documents
method. As mentioned previously, theparent_doc_retriever
will automatically chunk, embed, and ingest the documents via its vector and document stores.
Next, let’s define a function that creates the document batches, where each batch consists of
BATCH_SIZE
number of documents:1 def get_batches(docs: List[Document], batch_size: int) -> Generator: 2 """ 3 Return batches of documents to ingest into MongoDB 4 5 Args: 6 docs (List[Document]): List of LangChain documents 7 batch_size (int): Batch size 8 9 Yields: 10 Generator: Batch of documents 11 """ 12 for i in range(0, len(docs), batch_size): 13 yield docs[i : i + batch_size]
Finally, let’s define the main function that orchestrates the data ingest:
1 async def process_docs(docs: List[Document]) -> List[None]: 2 """ 3 Asynchronously ingest LangChain documents into MongoDB 4 5 Args: 6 docs (List[Document]): List of LangChain documents 7 8 Returns: 9 List[None]: Results of the task executions 10 """ 11 semaphore = asyncio.Semaphore(MAX_CONCURRENCY) 12 batches = get_batches(docs, BATCH_SIZE) 13 14 tasks = [] 15 for batch in batches: 16 tasks.append(process_batch(batch, semaphore)) 17 # Gather results from all tasks 18 results = await asyncio.gather(*tasks) 19 return results
The above code:
- Splits up the list of documents to ingest (
docs
) into batches using theget_batches
function defined previously. - Creates a task for each batch using the
process_batch
function from before, imposing the concurrency limit using a semaphore. - Uses
asyncio.gather
to execute tasks concurrently and collect their results. In our case, the tasks don’t return anything—they only ingest documents into MongoDB.
Now, let’s use the
process_docs
function above to ingest the LangChain documents from Step 4 into a MongoDB collection:1 collection = mongodb_client[DB_NAME][COLLECTION_NAME] 2 # Delete any existing documents from the collection 3 collection.delete_many({}) 4 print(f"Deletion complete.") 5 # Ingest LangChain documents into MongoDB 6 results = await process_docs(docs)
The above code:
- Deletes any existing documents from the MongoDB collection that we want to ingest documents into.
- Asynchronously ingests the documents (
docs
) into MongoDB using theprocess_docs
function defined previously.
Even in parent document retrieval, the first step is to retrieve the child chunks that are most relevant to the user query using semantic/vector search. To perform vector search in MongoDB Atlas, you first need to create a vector search index:
1 from pymongo.operations import SearchIndexModel 2 from pymongo.errors import OperationFailure 3 4 VS_INDEX_NAME = "vector_index" 5 6 # Vector search index definition 7 model = SearchIndexModel( 8 definition={ 9 "fields": [ 10 { 11 "type": "vector", 12 "path": "embedding", 13 "numDimensions": 1536, 14 "similarity": "cosine", 15 } 16 ] 17 }, 18 name=VS_INDEX_NAME, 19 type="vectorSearch", 20 ) 21 22 # Check if the index already exists, if not create it 23 try: 24 collection.create_search_index(model=model) 25 print( 26 f"Successfully created index {VS_INDEX_NAME} for collection {COLLECTION_NAME}" 27 ) 28 except OperationFailure: 29 print( 30 f"Duplicate index {VS_INDEX_NAME} found for collection {COLLECTION_NAME}. Skipping index creation." 31 )
The above code:
- Specifies the name of the vector search index (
VS_INDEX_NAME
). - Creates the vector search index definition which contains the path to the embeddings field in the documents (
path
), the number of embedding dimensions (numDimensions
), and the similarity metric to find nearest neighbors (similarity
). - Checks if a vector search index with the name
VS_INDEX_NAME
exists on theCOLLECTION_NAME
collection. If it does not, only then does it create the vector search index.
To bring this all together, let’s look at how to use parent document retrieval in RAG and agentic workflows.
1 from langchain_openai import ChatOpenAI 2 from langchain_core.prompts import ChatPromptTemplate 3 from langchain_core.runnables import RunnablePassthrough 4 from langchain_core.output_parsers import StrOutputParser 5 6 # Retrieve and parse documents 7 retrieve = { 8 "context": parent_doc_retriever 9 | (lambda docs: "\n\n".join([d.page_content for d in docs])), 10 "question": RunnablePassthrough(), 11 } 12 template = """Answer the question based only on the following context. If no context is provided, respond with I DON'T KNOW: \ 13 {context} 14 15 Question: {question} 16 """ 17 # Define the chat prompt 18 prompt = ChatPromptTemplate.from_template(template) 19 # Define the model to be used for chat completion 20 llm = ChatOpenAI(temperature=0, model="gpt-4o-2024-11-20") 21 # Parse output as a string 22 parse_output = StrOutputParser() 23 # Naive RAG chain 24 rag_chain = retrieve | prompt | llm | parse_output
The above code creates a RAG workflow with parent document retrieval in LangChain. At a high level, it does the following:
- Gathers context to answer questions using the
parent_doc_retriever
we created in Step 5 - Creates a prompt template (
prompt
) with a system prompt and placeholders for the context and user question - Initializes the chat completion LLM (
llm
) to use for generating responses - Creates a simple output parser (
parse_output
) to parse the LLM output as a string - Chains all the above components using LangChain’s pipe (
|
) notation to create a simple RAG workflow (rag_chain
)
An example response from the RAG chain is as follows:
1 print(rag_chain.invoke("How do I improve slow queries in MongoDB?")) 2 3 To improve slow queries in MongoDB, you can follow these steps: 4 5 1. **Use the Performance Advisor**: 6 - The Performance Advisor monitors slow queries and suggests new indexes to improve query performance. 7 - Review the suggested indexes, especially those with high Impact scores and low Average Query Targeting scores, and create them if they align with your indexing strategies. 8 9 2. **Analyze Query Performance**: 10 - Use the **Query Profiler** to explore slow-running operations and their key performance statistics for up to the last 24 hours. 11 - Use the **Real-Time Performance Panel (RTPP)** to evaluate query execution times and the ratio of documents scanned to documents returned. 12 13 3. **Monitor Query Latency**: 14 - Use **Namespace Insights** to monitor collection-level query latency and view query latency metrics and statistics. 15 16 4. **Fix Inefficient Queries**: 17 - Address `Query Targeting` alerts by adding indexes to support inefficient queries. 18 - Use the `cursor.explain()` command to analyze query plans and identify inefficiencies. 19 20 5. **Follow Best Practices**: 21 - Create queries that are supported by existing indexes. 22 - Avoid large array fields in documents that are costly to search and index. 23 - Optimize and remove unused or inefficient indexes to balance read and write performance. 24 - Perform rolling index builds to minimize performance impact on replica sets and sharded clusters. 25 26 6. **Configure Slow Query Threshold**: 27 - Adjust the slow query threshold to identify slow queries more effectively. By default, Atlas dynamically adjusts this threshold, but you can set a fixed threshold of 100 milliseconds if needed. 28 ... 29 - Ensure queries are supported by indexes. 30 - Optimize queries involving `$lookup` or large array fields. 31 32 By implementing these steps, you can identify and resolve slow queries, improving overall query performance in MongoDB.
Notice the very detailed response which not only includes steps to fix slow queries but also ways to analyze and monitor query performance, and best practices for writing MongoDB queries.
In the context of AI agents, you can provide the parent document retriever as one of the tools that an agent can use. Let’s see how to create a basic tool-calling agent using LangGraph, a framework from LangChain that allows you to orchestrate LLM applications as graphs.
First, let’s convert the
parent_doc_retriever
into an agent tool. In LangChain, creating tools is as simple as using the @tool
decorator on a Python function:1 from langchain.agents import tool 2 from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder 3 from typing import Annotated, Dict 4 from langgraph.graph.message import add_messages 5 from typing_extensions import TypedDict 6 from langgraph.prebuilt import ToolNode, tools_condition 7 from langgraph.graph import StateGraph, START, END 8 9 # Converting the retriever into an agent tool 10 11 def get_info_about_mongodb(user_query: str) -> str: 12 """ 13 Retrieve information about MongoDB. 14 15 Args: 16 user_query (str): The user's query string. 17 18 Returns: 19 str: The retrieved information formatted as a string. 20 """ 21 docs = parent_doc_retriever.invoke(user_query) 22 context = "\n\n".join([d.page_content for d in docs]) 23 return context 24 25 tools = [get_info_about_mongodb]
Next, let’s define the prompt for the agent and give it access to the tool(s) defined above:
1 # Define the LLM to use as the brain of the agent 2 llm = ChatOpenAI(temperature=0, model="gpt-4o-2024-11-20") 3 # Agent prompt 4 prompt = ChatPromptTemplate.from_messages( 5 [ 6 ( 7 "You are a helpful AI assistant." 8 " You are provided with tools to answer questions about MongoDB." 9 " Think step-by-step and use these tools to get the information required to answer the user query." 10 " Do not re-run tools unless absolutely necessary." 11 " If you are not able to get enough information using the tools, reply with I DON'T KNOW." 12 " You have access to the following tools: {tool_names}." 13 ), 14 MessagesPlaceholder(variable_name="messages"), 15 ] 16 ) 17 # Partial the prompt with tool names 18 prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools])) 19 # Bind tools to LLM 20 llm_with_tools = prompt | llm.bind_tools(tools)
The above code:
- Instantiates the LLM (
llm
) we want to use as the “brain” of our agent. - Defines the prompt (
prompt
) for the agent, with placeholders for the tool names and user messages. - Binds the LLM with the tool(s) defined previously.
Now, let’s orchestrate the agent using LangGraph. LangGraph allows you to build LLM systems as graphs. The graph’s nodes are functions or tools to perform specific tasks, while the edges define routes between nodes—these can be fixed, conditional, or even cyclic. Each graph has a state which is a shared data structure that all the nodes can access and make updates to. Let’s go ahead and define the state, nodes, and edges of our agent’s graph:
1 # Define graph state 2 class GraphState(TypedDict): 3 messages: Annotated[list, add_messages] 4 5 def agent(state: GraphState) -> Dict[str, List]: 6 """ 7 Agent node 8 9 Args: 10 state (GraphState): Graph state 11 12 Returns: 13 Dict[str, List]: Updates to the graph state 14 """ 15 messages = state["messages"] 16 response = llm_with_tools.invoke(messages) 17 # We return a list, because this will get added to the existing list 18 return {"messages": [response]} 19 20 # Convert tools into a graph node 21 tool_node = ToolNode(tools) 22 23 # Parameterize the graph with the state 24 graph = StateGraph(GraphState) 25 # Add graph nodes 26 graph.add_node("agent", agent) 27 graph.add_node("tools", tool_node) 28 # Add graph edges 29 graph.add_edge(START, "agent") 30 graph.add_edge("tools", "agent") 31 graph.add_conditional_edges( 32 "agent", 33 tools_condition, 34 {"tools": "tools", END: END}, 35 ) 36 # Compile the graph 37 app = graph.compile() 38 39 # Execute the agent and view outputs 40 inputs = { 41 "messages": [ 42 ("user", "How do I improve slow queries in MongoDB?"), 43 ] 44 } 45 46 for output in app.stream(inputs): 47 for key, value in output.items(): 48 print(f"Node {key}:") 49 print(value) 50 print("---FINAL ANSWER---") 51 print(value["messages"][-1].content)
The above code:
- Defines the graph’s state (
GraphState
). In our graph, we only want to track the user inputs and LLM responses (messages
) in the state, but you can track other custom attributes. - Defines the agent node, which is essentially a Python function (
agent
). This function reads existing messages from the graph state, makes a call to the LLM, and appends the response back to the graph state. - Converts the tool(s) defined previously into a node using the
ToolNode
class. - Initializes the graph (
StateGraph
), parameterized by the graph’s state. - Adds the nodes and edges to the graph. Notice the conditional edge that uses the LangGraph’s pre-built
tools_condition
function to route to the ToolNode if the last message has tool calls. Otherwise, it routes to theEND
node. - Compiles the graph using the
compile()
method. - Executes the graph in streaming mode using a test input.
In this tutorial, we learned about parent document retrieval and how it can help overcome the limitations of chunking at generation time while retaining its benefits for embedding. We also highlighted some use cases where this technique is particularly useful. Finally, we saw how parent document retrieval works in MongoDB and implemented it in RAG and Agentic workflows using MongoDB’s LangChain integration.
Now that you have a good understanding of this technique, check out the following tutorials to explore different chunking strategies with parent document retrieval, or evaluate this retrieval technique against others:
As always, if you have further questions as you build your AI applications, please reach out to us in our Generative AI community forums.
Top Comments in Forums
There are no comments on this article yet.