$function & $where usage for a MR query

Hello team,

How can I implement the below logic in mongo 4.4 with $function, $expr, $where. Is there an option to define global variable to be used across multiple records in 4.4 version. Below is the example I tried with MR in 4.2 version.

Since MR will be deprecated starting with v5.0. I’m trying to migrate the below logic as java script function in v4.4. I couldn’t find options with mongo aggregate to implement the below use case without adding transactions to an array. But the problem with this idea is it could run into 16 MB limitation if the volume of participating transactions is high.

data set

db.places.insertMany([
   { userid: "a", "location": "Bishan"},
   { userid: "b", "location": "Bukit Timah"},
   { userid: "c", "location": "Ang Mo Kio"},
   { userid: "d", "location": "Segar"},
   { userid: "e", "location": "Fajar"},
   { userid: "f", "location": "dover" },
   { userid: "g", "location": "Buona Vista"},
   { userid: "h", "location": "Marina Bay"},
   { userid: "i", "location": "Rocher"},
   { userid: "j", "location": "down town"},
   { userid: "k", "location": "Jurong"},
   { userid: "l", "location": "Pungol"},
   { userid: "m", "location": "One North"},
   { userid: "n", "location": "Cho Chu Kang"},
   { userid: "o", "location": "Yishun"}
]);

The below MR picks all records in a collection groups and classifies the records into multiple batches. i.e., “5” in each batch.

Global variables:

var counter = 0;
var batch = 1;
var threshold = 5;

Map Function:

var mapFunction = function() {
    var key = this.userid;
   
    if(counter >= threshold){
        batch = batch+1;
        counter = 0;
    }
    counter = counter +1;
    var value = { location: this.location, batch: batch };
    emit( key, value );
};

Reduce Function:

var reduceFunction = function(key, value) {
};

MR

db.places.mapReduce(
   mapFunction,
   reduceFunction,
   {
     out: "places_RV",
     scope: {
             batch : batch,
             counter: counter,
             threshold : threshold
         }
   }
)

Results

MongoDB Enterprise > db.places_RV.find().sort( { _id: 1 } )
{ "_id" : "a", "value" : { "location" : "Bishan", "batch" : 1 } }
{ "_id" : "b", "value" : { "location" : "Bukit Timah", "batch" : 1 } }
{ "_id" : "c", "value" : { "location" : "Ang Mo Kio", "batch" : 1 } }
{ "_id" : "d", "value" : { "location" : "Segar", "batch" : 1 } }
{ "_id" : "e", "value" : { "location" : "Fajar", "batch" : 1 } }
{ "_id" : "f", "value" : { "location" : "dover", "batch" : 2 } }
{ "_id" : "g", "value" : { "location" : "Buona Vista", "batch" : 2 } }
{ "_id" : "h", "value" : { "location" : "Marina Bay", "batch" : 2 } }
{ "_id" : "i", "value" : { "location" : "Rocher", "batch" : 2 } }
{ "_id" : "j", "value" : { "location" : "down town", "batch" : 2 } }
{ "_id" : "k", "value" : { "location" : "Jurong", "batch" : 3 } }
{ "_id" : "l", "value" : { "location" : "Pungol", "batch" : 3 } }
{ "_id" : "m", "value" : { "location" : "One North", "batch" : 3 } }
{ "_id" : "n", "value" : { "location" : "Cho Chu Kang", "batch" : 3 } }
{ "_id" : "o", "value" : { "location" : "Yishun", "batch" : 3 } }

Regards,
Rama

Hello, @Laks !

I have come up with 3 solutions for your case :wink::

Solution 1. $accumulator inside $group
Uses custom js-code to calculate batch number for a given document.

// Solution 1. $accumulator inside $group
db.places.aggregate([
  {
    $group: {
      _id: null,
      result: {
        $accumulator: {
          init: function() {
            return { i: 0, batchN: 0, batches: [] };
          },
          accumulateArgs: [{
            userid: '$userid',
            location: '$location',
          }],
          accumulate: function(state, arg) {
            const maxBatchSize = 5; // max documents per batch constant

            const currentI = state.i + 1;
            const currentBatchN = Math.ceil(currentI / maxBatchSize);
            
            const currentBatches = state.batches.concat({
              userid: arg.userid,
              value: {
                location: arg.location,
                batch: currentBatchN
              }
            });
            
            return {
              i: currentI,
              batchN: currentBatchN,
              batches: currentBatches
            }
          },
          merge: function(state1, state2) {
            // return empty object, because we do not merge objects
            return {};
          },
          lang: 'js',
        }
      }
    },
  },
  {
    $unwind: '$result.batches',
  },
  {
    $project: {
      _id: '$result.batches.userid',
      value: '$result.batches.value',
    }
  }
]);

Solution 2. $reduce after $group
This solution works similar to the previous one, but it does not use custom js-code, so it should work faster.

// Solution 2. $group and then $reduce
db.places.aggregate([
  {
    $group: {
      _id: null,
      batchesWithoutNo: {
        $push: {
          userid: '$userid',
          location: '$location'
        }
      }
    },
  },
  {
    $project: {
      result: {
        $reduce: {
          input: '$batchesWithoutNo',
          initialValue: {
            maxBatchSize: 5, // max documents per batch constant
            i: 0,
            batches: [],
          },
          in: {
            maxBatchSize: '$$value.maxBatchSize',
            i: {
              $add: ['$$value.i', 1],
            },
            batchN: {
              $ceil: {
                $divide: [
                  { $add: ['$$value.i', 1 ]},
                  '$$value.maxBatchSize'
                ]
              }
            },
            batches: {
              $concatArrays: [
                '$$value.batches',
                [
                  {
                    userid: '$$this.userid',
                    value: {
                      location: '$$this.location',
                      batch: {
                        $ceil: {
                          $divide: [
                            { $add: ['$$value.i', 1 ]},
                            '$$value.maxBatchSize' 
                          ]
                        }
                      }
                    }
                  }
                ],
              ]
            }
          }
        }
      }
    }
  },
  {
    $unwind: '$result.batches',
  },
  {
    $project: {
      _id: '$result.batches.userid',
      value: '$result.batches.value',
    }
  }
]);

Both solutions above work as expected, but since they use $group stage, they might hit 16MB BSON-document size limit.

If you decide to stick with one of those, and 16MB limitation is the real problem for you - check if that’s possible for you to run these pipelines with a $match + $limit stages, to process the whole collection with few runs. Note, this may you will have to distinguish documents that have been processed with your aggregation pipeline and what documents - not. This, for example, may involve adding additional boolean field to your documents.

Solution 3. $bucketAuto.
Since you need to distribute your documents between batches, I would suggest to look at [$bucketAuto]. (https://www.mongodb.com/docs/manual/reference/operator/aggregation/bucketAuto/) pipeline stage. I w

Given your example dataset, running the following pipeline:

// Solution 3. $bucketAuto
let batchGroups = db.places.aggregate([
  {
    $bucketAuto: {
      groupBy: '$_id',
      buckets: 3,
      output: {
        batches: {
          $addToSet: {
            userid: '$userid',
            location: '$location'
          }
        }
      }
    }
  },
]).toArray();

Would produce the following results:

[
  {
    _id: {
      min: ObjectId("64e5cc029286257cdfcc4ddf"),
      max: ObjectId("64e5cc029286257cdfcc4de4")
    },
    batches: [
      { userid: 'b', location: 'Bukit Timah' },
      { userid: 'a', location: 'Bishan' },
      { userid: 'd', location: 'Segar' },
      { userid: 'c', location: 'Ang Mo Kio' },
      { userid: 'e', location: 'Fajar' }
    ]
  },
  {
    _id: {
      min: ObjectId("64e5cc029286257cdfcc4de4"),
      max: ObjectId("64e5cc029286257cdfcc4de9")
    },
    batches: [
      { userid: 'f', location: 'dover' },
      { userid: 'i', location: 'Rocher' },
      { userid: 'h', location: 'Marina Bay' },
      { userid: 'j', location: 'down town' },
      { userid: 'g', location: 'Buona Vista' }
    ]
  },
  {
    _id: {
      min: ObjectId("64e5cc029286257cdfcc4de9"),
      max: ObjectId("64e5cc029286257cdfcc4ded")
    },
    batches: [
      { userid: 'l', location: 'Pungol' },
      { userid: 'm', location: 'One North' },
      { userid: 'n', location: 'Cho Chu Kang' },
      { userid: 'o', location: 'Yishun' },
      { userid: 'k', location: 'Jurong' }
    ]
  }
]

As you can see, documents are grouped in batches, only batch numbers are missing. But do you really need that number, if you have relevant document groped under corresponding batch? You can add batch numbers with a js-function, that is not a part of aggregation pipeline and you can execute it in the mongo shell or within your application code:

function transformToBatches(batchgroups) {
  const ungrouppedBatches = [];
  batchgroups.forEach(function (batchGroup, index) {
    batchGroup.batches.forEach(function (batch) {
      ungrouppedBatches.push({
        _id: batch.userid,
        value: {
          location: batch.location,
          batch: index + 1
        }
      });
    });
  });
  return ungrouppedBatches;
}

// pass result (batchGroups object) from the aggregation above 
// into this function call
transformToBatches(batchGroups);

All three solutions produce the same output:

[
  { _id: 'b', value: { location: 'Bukit Timah', batch: 1 } },
  { _id: 'a', value: { location: 'Bishan', batch: 1 } },
  { _id: 'd', value: { location: 'Segar', batch: 1 } },
  { _id: 'c', value: { location: 'Ang Mo Kio', batch: 1 } },
  { _id: 'e', value: { location: 'Fajar', batch: 1 } },
  { _id: 'f', value: { location: 'dover', batch: 2 } },
  { _id: 'i', value: { location: 'Rocher', batch: 2 } },
  { _id: 'h', value: { location: 'Marina Bay', batch: 2 } },
  { _id: 'j', value: { location: 'down town', batch: 2 } },
  { _id: 'g', value: { location: 'Buona Vista', batch: 2 } },
  { _id: 'l', value: { location: 'Pungol', batch: 3 } },
  { _id: 'm', value: { location: 'One North', batch: 3 } },
  { _id: 'n', value: { location: 'Cho Chu Kang', batch: 3 } },
  { _id: 'o', value: { location: 'Yishun', batch: 3 } },
  { _id: 'k', value: { location: 'Jurong', batch: 3 } }
]

As of 5.0 and arrival of [$setWindowFields](https://www.mongodb.com/docs/manual/reference/operator/aggregation/setWindowFields/) there is a super simple way to do this:

> db.places.aggregate([
        {$setWindowFields:{output:{batch:{$documentNumber:{}}},sortBy:{_id:1}}},
        {$set:{batch:{$ceil:{$divide:["$batch",5]}}}}
])
{ "_id" : ObjectId("680282faf8bf7676cfea7732"), "userid" : "a", "location" : "Bishan", "batch" : 1 }
{ "_id" : ObjectId("680282faf8bf7676cfea7733"), "userid" : "b", "location" : "Bukit Timah", "batch" : 1 }
{ "_id" : ObjectId("680282faf8bf7676cfea7734"), "userid" : "c", "location" : "Ang Mo Kio", "batch" : 1 }
{ "_id" : ObjectId("680282faf8bf7676cfea7735"), "userid" : "d", "location" : "Segar", "batch" : 1 }
{ "_id" : ObjectId("680282faf8bf7676cfea7736"), "userid" : "e", "location" : "Fajar", "batch" : 1 }
{ "_id" : ObjectId("680282faf8bf7676cfea7737"), "userid" : "f", "location" : "dover", "batch" : 2 }
{ "_id" : ObjectId("680282faf8bf7676cfea7738"), "userid" : "g", "location" : "Buona Vista", "batch" : 2 }
{ "_id" : ObjectId("680282faf8bf7676cfea7739"), "userid" : "h", "location" : "Marina Bay", "batch" : 2 }
{ "_id" : ObjectId("680282faf8bf7676cfea773a"), "userid" : "i", "location" : "Rocher", "batch" : 2 }
{ "_id" : ObjectId("680282faf8bf7676cfea773b"), "userid" : "j", "location" : "down town", "batch" : 2 }
{ "_id" : ObjectId("680282faf8bf7676cfea773c"), "userid" : "k", "location" : "Jurong", "batch" : 3 }
{ "_id" : ObjectId("680282faf8bf7676cfea773d"), "userid" : "l", "location" : "Pungol", "batch" : 3 }
{ "_id" : ObjectId("680282faf8bf7676cfea773e"), "userid" : "m", "location" : "One North", "batch" : 3 }
{ "_id" : ObjectId("680282faf8bf7676cfea773f"), "userid" : "n", "location" : "Cho Chu Kang", "batch" : 3 }
{ "_id" : ObjectId("680282faf8bf7676cfea7740"), "userid" : "o", "location" : "Yishun", "batch" : 3 }

The first stage, $setWindowFields adds $documentNumber to each document which is just a sequence number (1,2,3,4 etc) and the second stage converts that number to next multiple of 5 by dividing it by 5 and taking $ceil aka ceiling - next highest integer.

Note that setting a sequence number with $documentNumber requires sortBy in the $setWindowFields stage so that the order is not arbitrary like it is in your original example without any sort. You can specify any indexed field if you don’t actually care about which document goes into which batch.

Hope that helps someone
Asya

1 Like