Any way to get faster inserts?

Hi guys.I use bucket pattern for timeseries data.I use this code for importing my data into my table :

for file in sorted_files:
    df = process_file(file)

    for row,item in df.iterrows():
        data_dict = item.to_dict()
        mycol1.update_one(
            {"nsamples": {"$lt": 288}},
            {
                "$push": {"samples": data_dict},

                "$inc": {"nsamples": 1}
            },
            upsert=True
        )

The problem is that the insert is very very slow.Is there any way to get things done faster?Is there a way to do this with bulk insert?Thanks in advance guys!

@Pavel_Duchovny Hello Pavel.I am sorry for disturbing you.Can you help me with that?Is it possible to do multiple updates at once instead of one?

Hi @harris,

Sure. You can use a bulk updates using Bulk.find.upsert() syntax:

Tou can use unordered updates if the order don’t matter which will be parallel.

Let me know if that works for you

Thanks
Pavel

Thank you for you reply @Pavel_Duchovny
Do you mean something like that

for file in sorted_files:
    df = process_file(file)
    var bulk = mydb1.mycol1.initializeOrderedBulkOp()
    for row,item in df.iterrows():
        data_dict = item.to_dict()
        bulk.find().upsert().update_one(
            {"nsamples": {"$lt": 288}},
            {
                "$push": {"samples": data_dict},
                "$inc": {"nsamples": 1}
            },
            upsert=True
        )
        bulk.execute

Well first you need an upsert command it has its own in bulk.

Now you need to do the criteria in the find and in the upsert do the push. Accumulated bulk in the item for loop should be executed in outside the loop.

Essentially you build a bulk on client side and do the upsert after the loop avoiding the need to update per loop cycle…

Thanks
Pavel

@Pavel_Duchovny Thanks you for helping me.I cant do it on my own.If its possible can you write me with code what should i do?i know i am asking a lot but i am drowing on my own.

@Pavel_Duchovny Do you mean something like this:

for file in sorted_files:
    df = process_file(file)
    bulk = mydb1.mycol1.initializeOrderedBulkOp()
    for row,item in df.iterrows():
        data_dict = item.to_dict()
        bulk.find({"nsamples": {"$lt": 288}}).upsert().update_one(
            {
                "$push": {"samples": data_dict},
                "$inc": {"nsamples": 1}
            },
            upsert=True
        )
        bulk.execute

The execute should be done each x loops or outside of the loop.

You can run a counter and do every 1000 loops an execute and one at the end.

Thanks
Pavel

Is it a problem that in the start the table is empty?

Hi @harris,

No problem of running on empty collection but index the filter field for when its get filled.

But now that you say that I don’t understand the purpose of this update.

If you do $lt of the same number it will keep pushing to the same document creating a huge array.

You might be bottleneck by the array pushes … Why not to get the _id and spread the data by a bucket id or something …

Thanks
Pavel

What do you think of that?

 bulk_request = []
for file in sorted_files:
    df = process_file(file)
   
    for row, item in df.iterrows():
        data_dict = item.to_dict()
        bulk_request=mycol1.update_one(
            {"nsamples": {"$lt": 12}},
            {
                "$push": {"samples": data_dict},
                "$inc": {"nsamples": 1}
            },
            upsert=True
        )
    result = mycol1.bulk_write(bulk_request)

I dont think i see any changes in insert time…

It looks like you are doing regular updates and not bulk…
What makes you think this code is doing bulk updates…

Yes i edited the answer. I’m sorry I bothered you, i just cant understand how bulk works.Thanks for you patience.I appreciate it alot!

This is my final try

bulk_request=[]
for file in sorted_files:
    df = process_file(file)
    for row, item in df.iterrows():
        data_dict = item.to_dict()
        bulk_request.append(UpdateOne(
            {"nsamples": {"$lt": 12}},
            {
                "$push": {"samples": data_dict},
                "$inc": {"nsamples": 1}
            },
            upsert=True
        ))
    result = mycol1.bulk_write(bulk_request)

Why not to do the final bulk write after the main loop?

Why should i do that?If i keep it there i do bulk_write for each file…if i move it to the main loop i do one bulk_write in the end for all files?Is this why i should do the final bulk write after the main loop?Its optimal right?

Well it depends on expected amount of operations in a single bulk.

If it under 1000 you can do just one bulk operation.

It is the same collection so the minimum the client to database round trips the better the performance…

Thanks
Pavel

Why should i check if its under 1000?in my collection its about 1,2m rows importing with banches of 12 so its about 90.000 operations if i understand right…but mongodb i think does the divide on its own.i mean if its 2000 for example it divides the group in half
And one last thing…if i use updatemany instead of updateone here

bulk_request=[]
for file in sorted_files:
    df = process_file(file)
    for row, item in df.iterrows():
        data_dict = item.to_dict()
        bulk_request.append(UpdateOne(
            {"nsamples": {"$lt": 12}},
            {
                "$push": {"samples": data_dict},
                "$inc": {"nsamples": 1}
            },
            upsert=True
        ))
    result = mycol1.bulk_write(bulk_request)

Do i see any changes in terms of insertion time?

Hi @harris,

To advise any further I need the breakdown of the 1.2 M

How many files are there in this loop?

How much rows per file?

Is there only one python client processing all this data at once? Can you split it into threads?

Thanks
Pavel

11 files each one consist of 105000 rows…there is only one python client