How We Backfilled 2 Million Database Documents
Rate this tutorial
We recently needed to backfill nearly two million documents in our MongoDB database with a new attribute and wanted to share our process. First, some context on why we were doing this: This backfill was to support Netlify's Growth team, which builds prototypes into Netlify's core product and then evaluates how those prototypes impact user conversion and retention rates.
If we find that a prototype positively impacts growth, we use that finding to shape deeper investments in a particular area of the product. In this case, to measure the impact of a prototype, we needed to add an attribute that didn't previously exist to one of our database models.
With that out of the way, let's dive into how we did it!
Backend engineer Eric Betts and I started with a script from a smaller version of this task: backfilling 130,000 documents. The smaller backfill had taken about 11 hours, including time to tweak the script and restart it a few times when it died. At a backfill rate of 175-200 documents per minute, we were looking at a best-case scenario of eight to nine days straight of backfilling for over two million total documents, and that's assuming everything went off without a hitch. With a much bigger backfill ahead of us, we needed to see if we could optimize.
The starting script took two arguments—a
thread_pool_sizesize—and it worked like this:
- Create a new queue.
- Create a variable to store the number of documents we've processed.
- Query the database, limiting returned results to the
batch_sizewe passed in.
- Push each returned document into the queue.
- Create the number of worker threads we passed in with the
- Each thread makes an API call to a third-party API, then writes our new attribute to our database with the result from the third-party API.
- Update our count of documents processed.
- When there are no more documents in the queue to process, clean up the threads.
The script runs on a Kubernetes pod with memory and CPU constraints. It reads from our production MongoDB database and writes to a secondary.
When scaling up the original script to process 20 times the number of documents, we quickly hit some limitations:
Pod memory constraints. Running the script with
batch_sizeof two million documents and
thread_pool_sizeof five was promptly killed by the Kubernetes pod:
Too much manual intervention. Running with
batch_sizeof 100 and
thread_poolof five worked much better:
It ran super fast 🚀 there were no errors ✨... but we would have had to manually run it 20,000 times.
Third-party API rate limits. Even with a reliable
batch_size, we couldn't crank the
thread_pool_sizetoo high or we'd hit rate limits at the third-party API. Our script would finish running, but many of our documents wouldn't actually be backfilled, and we'd have to iterate over them again.
Eric and I needed something that met the following criteria:
- Doesn't use so much memory that it kills the Kubernetes pod.
- Doesn't use so much memory that it noticeably increases database read/write latency.
- Iterates through a complete batch of objects at a time; the job shouldn't die before at least attempting to process a full batch.
- Requires minimal babysitting. Some manual intervention is okay, but we need a job to run for several hours by itself.
- Lets us pick up where we left off. If the job dies, we don't want to waste time re-processing documents we've already processed once.
With this list of criteria, we started brainstorming solutions. We could:
- Dig into why the script was timing out before processing the full batch.
- Store references to documents that failed to be updated, and loop back over them later.
- Find a way to order the results returned by the database.
- Automatically add more jobs to the queue once the initial batch was processed.
#1 was an obvious necessity. We started logging the thread index to see if it would tell us anything:
This new log line let us see threads die off as the script ran. We'd go from running with five threads:
to running with a few:
to running with none.
We realized that when a thread would hit an error in an API request or a write to our database, we were rescuing and printing the error, but not continuing with the loop. This was a simple fix: When we
rescue, continue to the
nextiteration of the loop.
In a new run of the script, we needed a way to pick up where we left off. Idea #2—keeping track of failures across iterations of the script—was technically possible, but it wasn't going to be pretty. We expected idea #3—ordering the query results—to solve the same problem, but in a better way, so we went with that instead. Eric came up with the idea to order our query results by
created_atdate. This way, we could pass a
not_beforedate argument when running the script to ensure that we weren't processing already-processed objects. We could also print each document's
created_atdate as it was processed, so that if the script died, we could grab that date and pass it into the next run. Here's what it looked like:
So a log line might look like:
thread="6" obj="979apca..." created_at="Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00"
And if the script died after that line, we could grab that date and pass it back in:
Backfill.run(50000, 10, "Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00")
Unfortunately, when we added the ordering, we found that we unintentionally introduced a new memory limitation: the query results were sorted in memory, so we couldn't pass in too large of a batch size or we'd run out of memory on the Kubernetes pod. This lowered our batch size substantially, but we accepted the tradeoff since it eliminated the possibility of redoing work that had already been done.
The last critical task was to make our queue add to itself once the original batch of documents was processed.
Our first approach was to check the queue size, add more objects to the queue when queue size reached some threshold, and re-run the original query, but skip all the returned query results that we'd already processed. We stored the number we'd already processed in a variable called
skip_value. Each time we added to the queue, we would increase
skip_valueand skip an increasingly large number of results.
You can tell where this is going. At some point, we would try to skip too large of a value, run out of memory, fail to refill the queue, and the job would die.
So, with our existing loop to create and kick off the threads, we have something like this:
With this, we were finally getting the queue to add to itself when it was done. But the first time we ran this, we saw something surprising. The initial batch of 50,000 documents was processed quickly, and then the next batch that was added by our self-adding queue was processed very slowly. We ran
top -Hto check CPU and memory usage of our script on the Kubernetes pod and saw that it was using 90% of the system's CPU:
With these optimizations ironed out, Eric and I were able to complete our backfill at a processing rate of 800+ documents/minute with no manual intervention. Woohoo!
Currency Analysis with Time Series Collections #2 — Simple Moving Average and Exponential Moving Average Calculation
May 16, 2022