How to scan a collection in parallel from multiple clients?

I would like to do a batch operation that does something to every document in a large collection. The batch will be run using multiple workers, each with an independent connection to mongodb.

Each batch worker knows the total number of workers (N) and its index (I). So, N might be 4, and then there will be worker 0, 1, 2 and 3.

I’d like to query a subset of the documents in the collection from each worker.

I have a few ideas about how this might be accomplished:

1 populate every document in the collection with a random positive integer, and then do something like randomInt: {$mod: [N, I]}. The downside here is that we have to perform a migration to assign a randomInt to every doc, and modify our application logic to make sure they are inserted at creation time and maintained during updates.

2 if N is 16 (or 8, or 4) I could do something like db.activityMeta.count({$where: “hex_md5(this._id.toString())[23] == ‘g’“}) (use md5 to hash the _id and then use $where to partition by the last character). The downside here is that this is quite awkward for different values of N, and the $where clause cannot use the index, and so has to evaluate the condition on every doc for every worker (thus, on the db side, doing N simultaneous collection scans).

3 do a pre-emptive $bucketAuto aggregation, then pass an _id range to each individual worker. I think this adds a considerable amount of complexity in running the initial query, passing the ranges, and also due to the timing wrt. what happens to documents that are inserted after the initial query.

I am curious if I am missing any other ways to accomplish this? Ideally I’d be able to do something like create a cursor for “hash(_id) % N == I”, but in a way that could leverage the _id index, without having to modify documents in the collection.

Turns out there is another option - $expr!

Assuming that your document _ids are uniformly distributed across seconds, you can select a subset of them like so:

mongoClient
    .db()
    .collection('collection')
    .find({
      $expr: {
        $eq: [
          {
            $mod: [
              {
                // objectIds are precise to the second. so in ms they all will be products of 1000
                // let's divide by 1000 to get seconds
                $divide: [
                  {
                    // $toLong on a Date returns ms since epoch
                    $toLong: {
                      // $toDate on an objectId returns its timestamp
                      $toDate: '$_id'
                    }
                  },
                  1000
                ]
              },
              NUM_WORKERS
            ]
          },
          WORKER_INDEX
        ]
      }
    });

I am not totally sure what the performance of this is wrt. to disk. I hope that $expr can recognize that it only needs the _id field and read it from memory, rather than retrieving every (full) document from disk. I’m clarifying that with mongo support.

One important thing to know is that when a document is updated it is eventually written in whole to disk. See Will NaN entries consume much space? - #4 by Stennie_X.

I wanted to bring that because a big migration (doing something to every document) like that is most likely disk I/0 intensive.

So update a document with a random number and use this number to assign a worker, will result in writing the document twice. Once with the random number update and another for the update by the worker.

As for leveraging _id with your find() from your 2nd post, you will need to project because find() returns the whole document so the whole document will need to be read.

If

is a one time thing you might consider leveraging the flexible schema nature of MongoDB and only migrate documents when they are needed by another single document update use-case. The following is useful
Building with Patterns: The Schema Versioning Pattern | MongoDB Blog.

To create batches to may always use sort().skip().limit(), more or less like paging. Example, with 4 workers:

batch_number = n
batch_size = 1000 
worker_number = m
number_of_workers = 4
skipped_documents = ( number_of_workers * batch_number + worker_number ) * batch_size;
// skipped_documents = (4 * n + m) * 1000
batched_documents =
  c.sort( {_id:1} ).
     projection( {_id:1} ).
     skip( skipped_documents ).
     limit( batch_size )

Documents will be batched like
batch 0 worker 0 updates document 0 to batch_size - 1
batch 0 worker 1 updates document 1 * batch_size to 1 * batch_size - 1
batch 0 worker 2 updates document 2 * batch_size to 3 * batch_size - 1
batch 0 worker 3 updates document 3 * batch_size to 4 * batch_size - 1
batch 1 worker 0 updates document 4 * batch_size to 5 * batch_size - 1
batch n worker m updates document ( 4 * n + m ) * batch_size to ( 4 * n + m + 1 ) * batch_size - 1

Support got back to me. Unfortunately, even when $expr is used, if one performs the query with .explain and includes executionStats, the stats still show totalDocsExamined as being every document in the collection. So $expr is not effectively using the index in this case.

Thanks! I was able to adopt this strategy to get a solution:

  let lastProcessedId = undefined;

  let count = 0;
  while (true) {
    const docs: any[] = await collection
      .find(lastProcessedId ? { _id: { $gt: lastProcessedId } } : {})
      .skip(
        lastProcessedId
          ? BATCH_SIZE * NUM_WORKERS
          : BATCH_SIZE * WORKER_IDX
      )
      .limit(BATCH_SIZE)
      .sort({ _id: 1 })
      .project({ _id: 1 })
      .toArray();

    count += docs.length;
    if (docs.length == BATCH_SIZE) {
      lastProcessedId = docs[BATCH_SIZE - 1]._id;
    } else {
      break;
    }
  }

I confirmed that this results in correct execution stats:

db.collection.explain({executionStats: true}).find({_id: {$gt: ObjectId('38fd2f400000000000000000')}}).skip(10).limit(10);
...
totalKeysExamined: 20
totalDocsExamined: 10

Assuming that documents are only inserted in monotonically increasing order, this should get all of them (with a bit of weird concurrency / edge-behavior for the last batch, though this isn’t super important for my use case).

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.