Hey all,
I wanna sync all the collections of two mongoDB Atlas projects (staging and production) in databricks more than once in a day. So it is gonna be the replication of production to staging (updating the existed documents and adding the new entries). I noticed there are some options to do that e.g. mongodump
and mongorestore
but I noticed these are typically used for one-time backups and restorations, not for ongoing replication scenario.
I am looking for the fastest and efficient way to do that, cause the database is quite large.
I appreciate an helps based on that.
My code is in the following, which it is taking long time to run and that is not that much efficient:
SLEEP_TIME = 0.1
# Get the current time
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
# Start the timer
start_time = time.time()
# Iterate over all collections in the source database
for coll_name in tqdm(src_db.list_collection_names(), desc="Importing collections and creating indexes"):
if coll_name == "...":
print(f"\nCopying collection {coll_name}")
# Create a new collection in the destination database with the same name
dst_coll = dst_db[coll_name]
for name, index_info in src_db[coll_name].index_information().items():
keys = index_info["key"]
if "ns" in index_info:
del index_info["ns"]
del index_info["v"]
del index_info["key"]
dst_coll.create_index(keys, name=name, **index_info)
# Iterate over all documents in the source collection
coll_size = src_db[coll_name].count_documents({})
CHUNK_SIZE = 100
LIMIT = 50000
cursor = src_db[coll_name].find(
{"...": {"$ne": []}},
batch_size=CHUNK_SIZE,
limit=LIMIT
)
def yield_rows(cursor, chunk_size):
"""
Generator to yield chunks from cursor
:param cursor:
:param chunk_size:
:return:
"""
chunk = []
for i, row in enumerate(cursor):
if i % chunk_size == 0 and i > 0:
yield chunk
del chunk[:]
chunk.append(row)
yield chunk
chunks = yield_rows(cursor, CHUNK_SIZE)
for chunk in tqdm(
chunks, desc="Copying JSON documents in batches", total=round(LIMIT / CHUNK_SIZE)
):
operations = [
pymongo.UpdateOne({"_id": doc["..."]}, {"$set": doc}, upsert=True)
for doc in chunk
]
result = dst_coll.bulk_write(operations)
sleep(SLEEP_TIME)
# Print the last sync time and duration
print(f"\nLast sync time: {current_time}")
print("Sync duration: {:.2f} seconds".format(time.time() - start_time))
# Close the connection
src_client.close()
dst_client.close()