Updating 100s of Millions of Documents

Hi,

I have built a service similar to Shodan which catalogues and indexes internet hosts. I’m currently really struggling with updating documents at scale and would appreciate some advice.

Current Architecture (all distributed, all running latest Mongo version 4.4)

  • 3 x Mongos

  • 3 x configsvr in replicaset (configReplSet)

  • 10 x shardsvr in replicaset (shardReplSet)

Note: all nodes communicate over at least a 1GBps line and have an uptime of > 99.9%.

The cluster has a unique index on the “ip” field

I use Motor as my Python-Mongo adapter, which itself sits on top of Pymongo.

The service itself will perform a scan and aggregate details about a host, it will then prepare them for insertion into the mongo cluster. I use a simple try/catch block for initial inserts:

            try:                
                results = await self.mongo.insert_many(prepared_hosts, ordered=False)                
            except BulkWriteError as e:
                details: dict = e.details
                self.logger.info(f"There were some host insertion errors:"
                                 f"\nwriteErrors: {len(details.get('writeErrors'))}"
                                 f"\nnInserted: {details.get('nInserted')}"
                                 f"\nwriteConcernErrors: {details.get('writeConcernErrors')}"
                                 f"\nnUpserted: {details.get('nUpserted')}"
                                 f"\nnMatched: {details.get('nModified')}"
                                 f"\nnRemoved: {details.get('upserted')}"
                                 )

Note: self.mongo.insert_many is a wrapper around Pymongo’s insert_many

Inside the catch block I aggregate the the writeErrors and any that throw 11000 (# E11000 duplicate key error collection) are prepared for update. I And this is where the problems begin…

Attempt 1

  1. Pull the existing document from the collection
  2. Merge the new and old data
  3. Perform an update_one

Because each job was producing anywhere from 100,000 to 10,000,000 documents for processing, this bottleneck more often than not caused the job to run for days.

Attempt 2

  1. Push any duplicate documents to a “staging” queue
  2. Large amount of workers consume the queue to perform updates in more-or-less the same process as laid out in Attempt 1

This meant the jobs were finished in a very reasonable time, but even with 256 workers I could only manage approximately 1000 updates/second and the queue simply ballooned out of control, spiralling upwards of 200,000,000 documents within a few days.

And that is where I am, today. I would appreciate any thoughts on the matter, whether it’s process, architecture, both or whatever.

Hi @Darrel_Rendell and welcome in the MongoDB Community :muscle: !

From what I’m reading, it sounds like you want to insert the document if it doesn’t exist and update it if it already exists in your database. So it sounds like you would like to use an upsert which does exactly that.

Also, because you apparently have big batches, you want to use the BulkWrite operation combined with many insertOne with the upsert: true option.

This will avoid this logic of try / fail / retry with an update that you have implemented. As the IP address is your key apparently, you can use this for your filter for the upsert operation.

Cheers,
Maxime.

Hi Maxime,

Thanks for your reply!

Upserting is something I considered, but the challenge I have is that when I’m processing documents, the assumption is the IP hasn’t been seen before, so it is initialised with a default object:

{   
   "ip": str,
   "ports": Array[int],
   "banners": Array[Object],
   "certificates": Array[Object],
   "first_scanned": ISODate,
   "last_scanned": ISODate,
   "headers": Array[Object]
}

If a document with the provided IP exists (unique index), we then have to merge the documents. This is because inserting elements into banners, certificates or headers almost always requires modifying the ports array. If I did an upsert, it would set the new array element but not allow me to merge the ports.

I hope this makes sense.

1 Like

Then you need to use the updateOne operation, also with an upsert within a BulkWrite operation.

Let me try to illustrate with an example in Python.

from datetime import datetime
from pprint import pprint

from faker import Faker
from pymongo import MongoClient, UpdateOne

fake = Faker()


def rand_host():
    return {
        "ip": fake.ipv4(),
        "ports": [rand_port(), rand_port()],
        "banners": [{"banner": "my first banner"}],
        "certificates": [{"cert": "cert 1.0"}],
        "first_scanned": datetime.now(),
        "last_scanned": datetime.now(),
        "headers": [{"header": "some header"}]
    }


def rand_port():
    return fake.pyint(min_value=1, max_value=65535)


if __name__ == '__main__':
    client = MongoClient()
    db = client.get_database('shodan')
    hosts = db.get_collection('hosts')

    # clean the db
    hosts.delete_many({})

    # init my hosts collection with one host
    hosts.create_index("ip", unique=True)
    hosts.insert_one(rand_host())

    print('Print the existing document in my collection:\n')
    init_doc = hosts.find_one()
    pprint(init_doc)
    init_ip = init_doc.get('ip')

    print('\nIP already known:', init_ip)

    print('\nNow I will try to insert 2 new hosts but the first one will have the same IP address than the one already in my collection.')

    hosts.bulk_write([
        # this first document will be updated. $setOnInsert won't do anything as it's not an insert.
        UpdateOne({'ip': init_ip},
                  {'$setOnInsert': {'first_scanned': datetime.now()},
                   '$addToSet': {'ports': {'$each': [rand_port(), rand_port()]},
                                 'banners': {'banner': 'my second banner'},
                                 "certificates": {"cert": "cert 2.0"},
                                 "headers": {"header": "some other header"}
                                 },
                   '$set': {'last_scanned': datetime.now()}
                   }, upsert=True),
        # this ip address doesn't exist in my collection so it's an insert.
        UpdateOne({'ip': fake.ipv4()},
                  {'$setOnInsert': {'first_scanned': datetime.now()},
                   '$addToSet': {'ports': {'$each': [rand_port(), rand_port()]},
                                 'banners': {'banner': 'my second banner'},
                                 "certificates": {"cert": "cert 2.0"},
                                 "headers": {"header": "some other header"}
                                 },
                   '$set': {'last_scanned': datetime.now()}
                   }, upsert=True)
    ], ordered=False)

    print('Final result in my hosts collection:\n')
    for doc in hosts.find():
        pprint(doc)
        print()

Console output:

Print the existing document in my collection:

{'_id': ObjectId('6006fec1f502dc4efd9da90c'),
 'banners': [{'banner': 'my first banner'}],
 'certificates': [{'cert': 'cert 1.0'}],
 'first_scanned': datetime.datetime(2021, 1, 19, 16, 46, 9, 164000),
 'headers': [{'header': 'some header'}],
 'ip': '147.19.133.207',
 'last_scanned': datetime.datetime(2021, 1, 19, 16, 46, 9, 164000),
 'ports': [29658, 6283]}

IP already known: 147.19.133.207

Now I will try to insert 2 new hosts but the first one will have the same IP address than the one already in my collection.
Final result in my hosts collection:

{'_id': ObjectId('6006fec1f502dc4efd9da90c'),
 'banners': [{'banner': 'my first banner'}, {'banner': 'my second banner'}],
 'certificates': [{'cert': 'cert 1.0'}, {'cert': 'cert 2.0'}],
 'first_scanned': datetime.datetime(2021, 1, 19, 16, 46, 9, 164000),
 'headers': [{'header': 'some header'}, {'header': 'some other header'}],
 'ip': '147.19.133.207',
 'last_scanned': datetime.datetime(2021, 1, 19, 16, 46, 9, 165000),
 'ports': [29658, 6283, 27037, 11895]}

{'_id': ObjectId('6006fec10ef49ae99654a648'),
 'banners': [{'banner': 'my second banner'}],
 'certificates': [{'cert': 'cert 2.0'}],
 'first_scanned': datetime.datetime(2021, 1, 19, 16, 46, 9, 166000),
 'headers': [{'header': 'some other header'}],
 'ip': '79.222.50.147',
 'last_scanned': datetime.datetime(2021, 1, 19, 16, 46, 9, 166000),
 'ports': [47533, 38525]}

As you can see in my 2 final documents, the one which already existed in my collection has been updated correctly with the new ports, certificates and headers. The first_scanned field is untouched as it’s not an insert operation (thanks to the $setOnInsert).

The second one in brand new as it’s IP address was never seen before.

If you don’t want to update the document but rather completely update it, you could use replaceOne maybe? Depends what you want to do exactly.
You could replace some values also with $set instead of using the $addToSet or $push that work with arrays and will just append the new values in the array.

I hope it helps and this is what you needed :slight_smile:.

Cheers,
Maxime.

PS EDIT: I added ordered=False on the Bulk Op as it’s more efficient in your scenario and you won’t stop on the first error you might have.

4 Likes

@MaBeuLux88 this worked beautifully, thank you so much! Updating has gone from literal days to a few hundred seconds.

3 Likes

Wow! Thanks so much for this feedback! It makes my day!

From days to seconds, that’s what I call optimization now!

Cheers,
Maxime.

4 Likes

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.