How find works if cursor data is modified?

My code needs to loop through documents without timestamps, read docs, delete them, and reinsert them ( needed for trigger).
I found that the number of deletes/inserts is higher than the number of docs without timestamps. Some of those docs are processed twice.
I also found that adding “sort” to “find” is resolving the issue. Also, issues appear with bulk operations size 100 and fewer operations in batch, and total size of data must be bigger than the batch size.

Could somebody explain what is going on under the hood?

Code to repro the issue:

from datetime import datetime
import logging

import logging.handlers
from pymongo import MongoClient, InsertOne, ReplaceOne, UpdateOne, DeleteOne, ASCENDING, DESCENDING
from pymongo.errors import BulkWriteError
import certifi



def bulk_write_operations(collection, operations, force):
    if operations == []:
        return True

    if not force and (len(operations) < batch_size):
        return True

    try:
        # Bulk write the operations
        result = collection.bulk_write(operations, ordered=True)
        print(f'{len(operations)} total records processed. {result.bulk_api_result}')

    except BulkWriteError as e:
        print(e.details["errmsg"])
        return False

    del operations[:]
    return True


batch_size = 49

uri="XXXX?ssl=true&authSource=admin"
client = MongoClient(uri, tlsCAFile=certifi.where())
mydb = client["test"]
coll_main = mydb["testReprocess"]

# create 10 docs with timestamps
numRec = 10
operation_ins = []
for i in range(numRec):
    res = {'fileName':"testReprocess", "i": i, "log_ts_bv_processing": datetime.utcnow()}
    operation_ins.append(InsertOne(res))
    bulk_write_operations(coll_main, operation_ins, False)

# create 200 docs without timestamps
numRec = 200
for i in range(numRec):
    res = {'fileName': "testReprocess", "i": i}
    operation_ins.append(InsertOne(res))
    bulk_write_operations(coll_main, operation_ins, False)

bulk_write_operations(coll_main, operation_ins, True)


filter = {'fileName':"testReprocess", 'log_ts_bv_processing': {'$exists': False}}

num1 = coll_main.count_documents(filter)
print (f"Number of docs to reprocess: {num1}")

# workaround: adding sort to find
#result = coll_main.find(filter).sort('_id', ASCENDING)

# try to delete and insert documents in batches
result = coll_main.find(filter)
cnt = 0
operation_del = []
operation_ins = []
for res in result:
    delCondition = {"_id": res["_id"]}
    # print(f"_id: {res['_id']}")
    operation_del.append(DeleteOne(delCondition))
    operation_ins.append(InsertOne(res))
    cnt += 1
    bulk_write_operations(coll_main, operation_del, False)
    bulk_write_operations(coll_main, operation_ins, False)
    if (cnt % 10) == 0:
        print(f"Reprocessing {cnt} stalled loaded documents...")

print(f"len of operation_del: {len(operation_del)} ")

bulk_write_operations(coll_main, operation_del, True)
bulk_write_operations(coll_main, operation_ins, True)
print(f"Total: Reprocessed {cnt} stalled loaded documents")
==================
REsults:
Number of docs to reprocess: 200
Reprocessing 10 stalled loaded documents...
Reprocessing 20 stalled loaded documents...
Reprocessing 30 stalled loaded documents...
Reprocessing 40 stalled loaded documents...
Reprocessing 50 stalled loaded documents...
Reprocessing 60 stalled loaded documents...
Reprocessing 70 stalled loaded documents...
Reprocessing 80 stalled loaded documents...
Reprocessing 90 stalled loaded documents...
Reprocessing 100 stalled loaded documents...
Reprocessing 110 stalled loaded documents...
Reprocessing 120 stalled loaded documents...
Reprocessing 130 stalled loaded documents...
Reprocessing 140 stalled loaded documents...
Reprocessing 150 stalled loaded documents...
Reprocessing 160 stalled loaded documents...
Reprocessing 170 stalled loaded documents...
Reprocessing 180 stalled loaded documents...
Reprocessing 190 stalled loaded documents...
Reprocessing 200 stalled loaded documents...
Reprocessing 210 stalled loaded documents...
Reprocessing 220 stalled loaded documents...
Reprocessing 230 stalled loaded documents...
Reprocessing 240 stalled loaded documents...
Reprocessing 250 stalled loaded documents...
Reprocessing 260 stalled loaded documents...
Reprocessing 270 stalled loaded documents...
Reprocessing 280 stalled loaded documents...
Reprocessing 290 stalled loaded documents...
len of operation_del: 4 
Total: Reprocessed 298 stalled loaded documents

So, it was supposed to reinsert 200 docs but ended with reinserting 298. When a number of docs is in millions, the difference is significant.

Hi @SERGIY_MITIN and welcome to MongoDB community forums!!

I appreciate your detailed information sharing.

As stated in the MongoDB documentation regarding CRUD concepts, it’s important to note that the cursor may, under certain circumstances, return the same document multiple times. Consequently, this behavior can lead to a higher count of returned documents than the initially calculated number. This occurrence could potentially be a contributing factor to the error you are currently encountering.

In MongoDB, document processing occurs in the order of their retrieval by the find() method, primarily because this method does not guarantee any specific order for document retrieval.

When you employ the bulk_write() method with a list of operations, MongoDB executes these operations sequentially in the order they are listed. However, if you are processing documents without any particular order, there is a possibility that some documents may be processed more than once.

Using sort() , the sequence is getting defined and it does not process the document twice and ensures that documents are processed in a specific order.

Please don’t hesitate to reach out if you have any further questions or need additional clarification.

Regards
Aasawari