HomeLearnArticleHow We Backfilled 2 Million Database Documents

How We Backfilled 2 Million Database Documents

Updated: Mar 15, 2022 |

Published: Mar 15, 2022

By Jen Kagan

 and Eric Betts

Rate this article

We recently needed to backfill nearly two million documents in our MongoDB database with a new attribute and wanted to share our process. First, some context on why we were doing this: This backfill was to support Netlify's Growth team, which builds prototypes into Netlify's core product and then evaluates how those prototypes impact user conversion and retention rates.

If we find that a prototype positively impacts growth, we use that finding to shape deeper investments in a particular area of the product. In this case, to measure the impact of a prototype, we needed to add an attribute that didn't previously exist to one of our database models.

With that out of the way, let's dive into how we did it!

Backend engineer Eric Betts and I started with a script from a smaller version of this task: backfilling 130,000 documents. The smaller backfill had taken about 11 hours, including time to tweak the script and restart it a few times when it died. At a backfill rate of 175-200 documents per minute, we were looking at a best-case scenario of eight to nine days straight of backfilling for over two million total documents, and that's assuming everything went off without a hitch. With a much bigger backfill ahead of us, we needed to see if we could optimize.

The starting script took two arguments—a batch_size and thread_pool_size size—and it worked like this:

  1. Create a new queue.
  2. Create a variable to store the number of documents we've processed.
  3. Query the database, limiting returned results to the batch_size we passed in.
  4. Push each returned document into the queue.
  5. Create the number of worker threads we passed in with the thread_pool_size argument.
  6. Each thread makes an API call to a third-party API, then writes our new attribute to our database with the result from the third-party API.
  7. Update our count of documents processed.
  8. When there are no more documents in the queue to process, clean up the threads.

The script runs on a Kubernetes pod with memory and CPU constraints. It reads from our production MongoDB database and writes to a secondary.

#More repos, more problems

When scaling up the original script to process 20 times the number of documents, we quickly hit some limitations:

Pod memory constraints. Running the script with batch_size of two million documents and thread_pool_size of five was promptly killed by the Kubernetes pod:

1Backfill.run(2000000, 5)

Too much manual intervention. Running with batch_size of 100 and thread_pool of five worked much better:

1Backfill.run(100, 5)

It ran super fast 🚀 there were no errors ✨... but we would have had to manually run it 20,000 times.

Third-party API rate limits. Even with a reliable batch_size, we couldn't crank the thread_pool_size too high or we'd hit rate limits at the third-party API. Our script would finish running, but many of our documents wouldn't actually be backfilled, and we'd have to iterate over them again.

#Brainstorming solutions

Eric and I needed something that met the following criteria:

  • Doesn't use so much memory that it kills the Kubernetes pod.
  • Doesn't use so much memory that it noticeably increases database read/write latency.
  • Iterates through a complete batch of objects at a time; the job shouldn't die before at least attempting to process a full batch.
  • Requires minimal babysitting. Some manual intervention is okay, but we need a job to run for several hours by itself.
  • Lets us pick up where we left off. If the job dies, we don't want to waste time re-processing documents we've already processed once.

With this list of criteria, we started brainstorming solutions. We could:

  1. Dig into why the script was timing out before processing the full batch.
  2. Store references to documents that failed to be updated, and loop back over them later.
  3. Find a way to order the results returned by the database.
  4. Automatically add more jobs to the queue once the initial batch was processed.

#Optimizations

#You're in time out

#1 was an obvious necessity. We started logging the thread index to see if it would tell us anything:

1def self.run(batch_size, thread_pool_size)
2 jobs = Queue.new
3
4 # get all the documents that meet these criteria
5 objs = Obj.where(...)
6 # limit the returned objects to the batch_size
7 objs = objs.limit(batch_size)
8 # push each document into the jobs queue to be processed
9 objs.each { |o| jobs.push o }
10
11 # create a thread pool
12 workers = (thread_pool_size).times.map do |i|
13 Thread.new do
14 begin
15 while j = jobs.pop(true)
16 # log the thread index and object ID
17 Rails.logger.with_fields(thread: i, obj: obj.id)
18 begin
19 # process objects
20 end
21...

This new log line let us see threads die off as the script ran. We'd go from running with five threads:

1thread="4" obj="939bpca..."
2thread="1" obj="939apca..."
3thread="5" obj="939cpca..."
4thread="2" obj="939dpca..."
5thread="3" obj="939fpca..."
6thread="4" obj="969bpca..."
7thread="1" obj="969apca..."
8thread="5" obj="969cpca..."
9thread="2" obj="969dpca..."
10thread="3" obj="969fpca..."

to running with a few:

1thread="4" obj="989bpca..."
2thread="1" obj="989apca..."
3thread="4" obj="979bpca..."
4thread="1" obj="979apca..."

to running with none.

We realized that when a thread would hit an error in an API request or a write to our database, we were rescuing and printing the error, but not continuing with the loop. This was a simple fix: When we rescue, continue to the next iteration of the loop.

1 begin
2 # process documents
3 rescue
4 next
5 end

#Order, order

In a new run of the script, we needed a way to pick up where we left off. Idea #2—keeping track of failures across iterations of the script—was technically possible, but it wasn't going to be pretty. We expected idea #3—ordering the query results—to solve the same problem, but in a better way, so we went with that instead. Eric came up with the idea to order our query results by created_at date. This way, we could pass a not_before date argument when running the script to ensure that we weren't processing already-processed objects. We could also print each document's created_at date as it was processed, so that if the script died, we could grab that date and pass it into the next run. Here's what it looked like:

1def self.run(batch_size, thread_pool_size, not_before)
2 jobs = Queue.new
3
4 # order the query results in ascending order
5 objs = Obj.where(...).order(created_at: -1)
6 # get documents created after the not_before date
7 objs = objs.where(:created_at.gte => not_before)
8 # limit the returned documents to the batch_size
9 objs = objs.limit(batch_size)
10 # push each document into the jobs queue to be processed
11 objs.each { |o| jobs.push o }
12
13 workers = (thread_pool_size).times.map do |i|
14 Thread.new do
15 begin
16 while j = jobs.pop(true)
17 # log each document's created_at date as it's processed
18 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at)
19 begin
20 # process documents
21 rescue
22 next
23 end
24...

So a log line might look like: thread="6" obj="979apca..." created_at="Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00"

And if the script died after that line, we could grab that date and pass it back in: Backfill.run(50000, 10, "Wed, 11 Nov 2020 02:04:11.891000000 UTC +00:00")

Nice!

Unfortunately, when we added the ordering, we found that we unintentionally introduced a new memory limitation: the query results were sorted in memory, so we couldn't pass in too large of a batch size or we'd run out of memory on the Kubernetes pod. This lowered our batch size substantially, but we accepted the tradeoff since it eliminated the possibility of redoing work that had already been done.

#The job is never done

The last critical task was to make our queue add to itself once the original batch of documents was processed.

Our first approach was to check the queue size, add more objects to the queue when queue size reached some threshold, and re-run the original query, but skip all the returned query results that we'd already processed. We stored the number we'd already processed in a variable called skip_value. Each time we added to the queue, we would increase skip_value and skip an increasingly large number of results.

You can tell where this is going. At some point, we would try to skip too large of a value, run out of memory, fail to refill the queue, and the job would die.

1 skip_value = batch_size
2 step = batch_size
3
4 loop do
5 if jobs.size < 1000
6 objs = Obj.where(...).order(created_at: -1)
7 objs = objs.where(:created_at.gte => created_at)
8 objs = objs.skip(skip_value).limit(step) # <--- job dies when this skip_value gets too big ❌
9 objs.each { |r| jobs.push r }
10
11 skip_value += step # <--- this keeps increasing as we process more objects ❌
12
13 if objs.count == 0
14 break
15 end
16 end
17 end

We ultimately tossed out the increasing skip_value, opting instead to store the created_at date of the last object processed. This way, we could skip a constant, relatively low number of documents instead of slowing down and eventually killing our query by skipping an increasing number:

1 refill_at = 1000
2 step = batch_size
3
4 loop do
5 if jobs.size < refill_at
6 objs = Obj.where(...).order(created_at: -1)
7 objs = objs.where(:created_at.gte => last_created_at) # <--- grab last_created_at constant from earlier in the script ✅
8 objs = objs.skip(refill_at).limit(step) # <--- skip a constant amount ✅
9 objs.each { |r| jobs.push r }
10
11 if objs.count == 0
12 break
13 end
14 end
15 end

So, with our existing loop to create and kick off the threads, we have something like this:

1def self.run(batch_size, thread_pool_size, not_before)
2 jobs = Queue.new
3
4 objs = Obj.where(...).order(created_at: -1)
5 objs = objs.where(:created_at.gte => not_before)
6 objs = objs.limit(step)
7 objs.each { |o| jobs.push o }
8
9 updated = 0
10 last_created_at = "" # <--- we update this variable...
11
12 workers = (thread_pool_size).times.map do |i|
13 Thread.new do
14 begin
15 while j = jobs.pop(true)
16 Rails.logger.with_fields(thread: i, obj: obj.id, created_at: obj.created_at)
17 begin
18 # process documents
19 updated += 1
20 last_created_at = obj.created_at # <--- ...with each document processed
21 rescue
22 next
23 end
24 end
25 end
26 end
27 end
28
29 loop do
30 skip_value = batch_size
31 step = 10000
32
33 if jobs.size < 1000
34 objs = Obj.where(...).order(created: -1)
35 objs = objs.where(:created_at.gte => not_before)
36 objs = objs.skip(skip_value).limit(step)
37
38 objs.each { |r| jobs.push r }
39 skip_value += step
40
41 if objs.count == 0
42 break
43 end
44 end
45 end
46 workers.map(&:join)
47end

With this, we were finally getting the queue to add to itself when it was done. But the first time we ran this, we saw something surprising. The initial batch of 50,000 documents was processed quickly, and then the next batch that was added by our self-adding queue was processed very slowly. We ran top -H to check CPU and memory usage of our script on the Kubernetes pod and saw that it was using 90% of the system's CPU:

https://mongodb-devhub-cms.s3.us-west-1.amazonaws.com/output_showing_cpu_usage_f67af92926.png

Adding a few sleep statements between loop iterations helped us get CPU usage down to a very reasonable 6% for the main process.

With these optimizations ironed out, Eric and I were able to complete our backfill at a processing rate of 800+ documents/minute with no manual intervention. Woohoo!

Rate this article
MongoDB logo
© 2021 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.