Using transactions to clear and re-populate a collection

Hi all!

I’ve an application that uses a Collection with a lot of documents, that are embeddings for a RAG application. I’ve them in a MongoDB Atlas instance, the Vector Search is configured, and everything works fine.

The issue here is when I need to update these embeddings inside the collection. The idea is to completely remove all the old documents and insert the new one in an unique transaction, because of a precise requirement: we cannot experience any downtime of the RAG application that uses this collection.

For more context, this is the Python code to execute the logic:

def insert_documents_with_transaction(client, collection, docs_to_insert):
    with client.start_session() as session:
        try:
            with session.start_transaction():
                collection.delete_many({}, session=session)
                insert_result = collection.insert_many(docs_to_insert, session=session)
                session.commit_transaction()
        except Exception as err:
            raise Exception(err)

Consider that the amount of documents to remove/insert are between 4000 and 5000.

This code, while might work in a local replica set or a small MongoDB Atlas, in production environments crashes because of a TransientTransactionError with code 251 - NoSuchTransaction. I know this is usually caused by concurrent operations or timeout of the transaciton, but there are no other threads/operations writing to the same repository and the error happens after 5-6 seconds.

So I’m wondering what’s wrong here. Is it because of the high amount of documents that I’m including in the transaction? Or probably the configured Vector Search index causes this exception? Or simply it is just a memory issue?

I’m either trying to find a solution or to better understand the logic behind to find the proper solution to this. Thanks to anyone willing to help.

I would suggest avoiding transaction for this use case for two reasons:
Firstly, your transaction is being automatically aborted either due to a write conflict or because it is simply taking too long (exceeds transactionLifetimeLimitSeconds which defaults to 1 minute).

Secondly, the vector search index is updated asynchronously regardless of if you use a transaction or not.

One alternative design is to write the data to a new collection and the do a rename with dropTarget=True to replace the data. Note that this may abort any active reads on the collection so you may need to introduce a retry mechanism into your query logic.

def replace_collection(collection, docs_to_insert):
    temp_coll = collection[f"temp-{random.random()}"]
    temp_coll.insert_many(docs_to_insert)
    temp_coll.rename(collection.name, dropTarget=True)

Hi Shane! Thanks so much for your answer.

The failures I’ve noticed in my code occur in less than 1 minute (few seconds), this is why I believed that it was the maxTransactionLockRequestTimeoutMillis, which defaults to 5 seconds. But unfortunately the error messages are not so explicit and there’s no much of public knowledge (I guess it doesn’t happen often). Not a big issue anyway.

Your snippet is quite interesting. Despite the abortion of active read (adding a retry is not a problem). I assume you run it without any transaction, right?

Anyway, I’m more interested in this sentence you just wrote:

Secondly, the vector search index is updated asynchronously regardless of if you use a transaction or not.

I didn’t know about that. Does that mean that the saerch results will be affected by the old data in the period of time between the update of the collection and the completion of the re-indexing? Do you have any link or article I can study on?

The failures I’ve noticed in my code occur in less than 1 minute (few seconds), this is why I believed that it was the maxTransactionLockRequestTimeoutMillis, which defaults to 5 seconds. But unfortunately the error messages are not so explicit and there’s no much of public knowledge (I guess it doesn’t happen often). Not a big issue anyway.

A transaction can be aborted for many reasons and unfortunately sometimes the error message is not clear. It could be WriteConflicts with other writes to the same document (transactional or not), or that the total data size touched by the transaction exceeds a limit (related to WiredTiger cache size). For these reasons, best practice is to keep transactions small (“no more than 1,000 documents”) according to our guidance here.

Does that mean that the saerch results will be affected by the old data in the period of time between the update of the collection and the completion of the re-indexing?

Good question! I forgot to mention that the old vector search index will be dropped during the rename (along with any other indexes). You will need to build a vector search index on the new collection before renaming:

def create_search_index(coll):
    """Create the vector search index and wait for it to be queryable."""
    name = coll.create_search_index(...)
    start = time.monotonic()
    timeout_seconds = 60
    while time.monotonic() - start < timeout_seconds:
        indexes = list(coll.list_search_indexes(name=name))
        if indexes and indexes[0].get("queryable"):
            return
        time.sleep(1)
    raise TimeoutError(f"timed out after waiting {timeout_seconds}s for search index to be created")


def replace_collection(collection, docs_to_insert):
    temp_coll = collection[f"temp-{random.random()}"]
    temp_coll.insert_many(docs_to_insert)
    create_search_index(temp_coll)
    temp_coll.rename(collection.name, dropTarget=True)

Do you have any link or article I can study on?

I can’t find a link that explains this behavior but it is what I’ve found to be true from experimenting with the api. I’ll ask in internal channels to see if this can be documented.

Thanks. Shane!

Much appreciated, for both your explanations and your interest on inquiring on this.

At the end, we decided to go to a direction similar to the one you proposed with the create_search_index.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.