Synchoronize two different MongoDB projects

Hey all,
I wanna sync all the collections of two mongoDB Atlas projects (staging and production) in databricks more than once in a day. So it is gonna be the replication of production to staging (updating the existed documents and adding the new entries). I noticed there are some options to do that e.g. mongodump and mongorestore but I noticed these are typically used for one-time backups and restorations, not for ongoing replication scenario.
I am looking for the fastest and efficient way to do that, cause the database is quite large.
I appreciate an helps based on that.
My code is in the following, which it is taking long time to run and that is not that much efficient:

SLEEP_TIME = 0.1

# Get the current time
current_time = time.strftime("%Y-%m-%d %H:%M:%S")

# Start the timer
start_time = time.time()

# Iterate over all collections in the source database
for coll_name in tqdm(src_db.list_collection_names(), desc="Importing collections and creating indexes"):
    if coll_name == "...":
        print(f"\nCopying collection {coll_name}")

        # Create a new collection in the destination database with the same name
        dst_coll = dst_db[coll_name]
        for name, index_info in src_db[coll_name].index_information().items():
            keys = index_info["key"]
            if "ns" in index_info:
                del index_info["ns"]
            del index_info["v"]
            del index_info["key"]
            dst_coll.create_index(keys, name=name, **index_info)

        # Iterate over all documents in the source collection
        coll_size = src_db[coll_name].count_documents({})
        CHUNK_SIZE = 100
        LIMIT = 50000
        cursor = src_db[coll_name].find(
            {"...": {"$ne": []}},
            batch_size=CHUNK_SIZE,
            limit=LIMIT
        )

        def yield_rows(cursor, chunk_size):
            """
            Generator to yield chunks from cursor
            :param cursor:
            :param chunk_size:
            :return:
            """
            chunk = []
            for i, row in enumerate(cursor):
                if i % chunk_size == 0 and i > 0:
                    yield chunk
                    del chunk[:]
                chunk.append(row)
            yield chunk

        chunks = yield_rows(cursor, CHUNK_SIZE)

        for chunk in tqdm(
                chunks, desc="Copying JSON documents in batches", total=round(LIMIT / CHUNK_SIZE)
        ):
            operations = [
                pymongo.UpdateOne({"_id": doc["..."]}, {"$set": doc}, upsert=True)
                for doc in chunk
            ]
            result = dst_coll.bulk_write(operations)

            sleep(SLEEP_TIME)

# Print the last sync time and duration
print(f"\nLast sync time: {current_time}")
print("Sync duration: {:.2f} seconds".format(time.time() - start_time))

# Close the connection
src_client.close()
dst_client.close()

Hi @Nazila_Hashemi
What products are you using from databricks?

For continuous replication scenario using “MongoDB Spark Connector” in streaming mode could be a good pattern to utilize.

You could also use the changestreams directly depending on the use case: https://www.mongodb.com/docs/manual/changeStreams/

1 Like

Hi @Prakul_Agarwal,

Thank you for your response.
Sorry, I guess I was not that much clear!
I am using the Databricks notebook just to connect to MongoDB databases, my aim is synchronize two different projects on MongoDB Atlas.
In databricks:

# define the source and target (destination)
mongo_source_uri = dbutils.secrets.get(
    "keyvault", "..."
)

mongo_target_uri = dbutils.secrets.get(
    "keyvault", "..."

src_client = MongoClient(mongo_source_uri)
src_db = src_client["collection"]

dst_client = MongoClient(mongo_target_uri)
dst_db = dst_client["collection"]

and the rest code is the same in the first post.
So Databricks is kind of a bridge to sync my both projects on MongoDB Atlas.

I think something similar has been answered here

1 Like