My code needs to loop through documents without timestamps, read docs, delete them, and reinsert them ( needed for trigger).
I found that the number of deletes/inserts is higher than the number of docs without timestamps. Some of those docs are processed twice.
I also found that adding “sort” to “find” is resolving the issue. Also, issues appear with bulk operations size 100 and fewer operations in batch, and total size of data must be bigger than the batch size.
Could somebody explain what is going on under the hood?
Code to repro the issue:
from datetime import datetime
import logging
import logging.handlers
from pymongo import MongoClient, InsertOne, ReplaceOne, UpdateOne, DeleteOne, ASCENDING, DESCENDING
from pymongo.errors import BulkWriteError
import certifi
def bulk_write_operations(collection, operations, force):
if operations == []:
return True
if not force and (len(operations) < batch_size):
return True
try:
# Bulk write the operations
result = collection.bulk_write(operations, ordered=True)
print(f'{len(operations)} total records processed. {result.bulk_api_result}')
except BulkWriteError as e:
print(e.details["errmsg"])
return False
del operations[:]
return True
batch_size = 49
uri="XXXX?ssl=true&authSource=admin"
client = MongoClient(uri, tlsCAFile=certifi.where())
mydb = client["test"]
coll_main = mydb["testReprocess"]
# create 10 docs with timestamps
numRec = 10
operation_ins = []
for i in range(numRec):
res = {'fileName':"testReprocess", "i": i, "log_ts_bv_processing": datetime.utcnow()}
operation_ins.append(InsertOne(res))
bulk_write_operations(coll_main, operation_ins, False)
# create 200 docs without timestamps
numRec = 200
for i in range(numRec):
res = {'fileName': "testReprocess", "i": i}
operation_ins.append(InsertOne(res))
bulk_write_operations(coll_main, operation_ins, False)
bulk_write_operations(coll_main, operation_ins, True)
filter = {'fileName':"testReprocess", 'log_ts_bv_processing': {'$exists': False}}
num1 = coll_main.count_documents(filter)
print (f"Number of docs to reprocess: {num1}")
# workaround: adding sort to find
#result = coll_main.find(filter).sort('_id', ASCENDING)
# try to delete and insert documents in batches
result = coll_main.find(filter)
cnt = 0
operation_del = []
operation_ins = []
for res in result:
delCondition = {"_id": res["_id"]}
# print(f"_id: {res['_id']}")
operation_del.append(DeleteOne(delCondition))
operation_ins.append(InsertOne(res))
cnt += 1
bulk_write_operations(coll_main, operation_del, False)
bulk_write_operations(coll_main, operation_ins, False)
if (cnt % 10) == 0:
print(f"Reprocessing {cnt} stalled loaded documents...")
print(f"len of operation_del: {len(operation_del)} ")
bulk_write_operations(coll_main, operation_del, True)
bulk_write_operations(coll_main, operation_ins, True)
print(f"Total: Reprocessed {cnt} stalled loaded documents")
==================
REsults:
Number of docs to reprocess: 200
Reprocessing 10 stalled loaded documents...
Reprocessing 20 stalled loaded documents...
Reprocessing 30 stalled loaded documents...
Reprocessing 40 stalled loaded documents...
Reprocessing 50 stalled loaded documents...
Reprocessing 60 stalled loaded documents...
Reprocessing 70 stalled loaded documents...
Reprocessing 80 stalled loaded documents...
Reprocessing 90 stalled loaded documents...
Reprocessing 100 stalled loaded documents...
Reprocessing 110 stalled loaded documents...
Reprocessing 120 stalled loaded documents...
Reprocessing 130 stalled loaded documents...
Reprocessing 140 stalled loaded documents...
Reprocessing 150 stalled loaded documents...
Reprocessing 160 stalled loaded documents...
Reprocessing 170 stalled loaded documents...
Reprocessing 180 stalled loaded documents...
Reprocessing 190 stalled loaded documents...
Reprocessing 200 stalled loaded documents...
Reprocessing 210 stalled loaded documents...
Reprocessing 220 stalled loaded documents...
Reprocessing 230 stalled loaded documents...
Reprocessing 240 stalled loaded documents...
Reprocessing 250 stalled loaded documents...
Reprocessing 260 stalled loaded documents...
Reprocessing 270 stalled loaded documents...
Reprocessing 280 stalled loaded documents...
Reprocessing 290 stalled loaded documents...
len of operation_del: 4
Total: Reprocessed 298 stalled loaded documents
So, it was supposed to reinsert 200 docs but ended with reinserting 298. When a number of docs is in millions, the difference is significant.