PlanExecutor Out of memory in aggregation group-stage with $accumulator

I am currently working on a rewrite of MapReduce operations to the aggregation pipeline. The task requires that the result structure should not change at all, as there is quite a lot of code relying on the data structure and it would not be practical to rewrite all of that in a sensible amount of time. My issue with the new job after rewrite to the aggregation pipeline is, that it’s crashing when too many incoming documents are being processed in one run. The error message is

MongoServerError: PlanExecutor error during aggregation :: caused by :: Out of memory

I don’t quite understand what exactly causes this error, neither how to avoid it. I am very sorry for the length of the description, I have already tried to trim down the code examples as much as I can. Here are the details:

The old MapReduce jobs are running fine in production and have done so for years now, though at the moment we’re still on Mongo 3.6; I am running development on MongoDB 6.0.8 however, using Mongosh 1.10.1. The development server has 32GB of RAM and doesn’t do anything but run that one aggregation. The production server (the one still running the original MapReduce job) had 32GB of RAM as well until a fairly recent upgrade, so I know the MR-aggregation can cope with that amount of RAM in the ancient Mongo 3.6.

The jobs are run using mongosh directly on the server.

I haven’t touched the server config much, server is the only member of the replication set:

replication:
   oplogSizeMB: 2048
   replSetName: rs0

I run

	db.adminCommand(
		{
			setParameter: 1,
			allowDiskUseByDefault: true
		}
	)

immediately before the job and the aggregation command itself is executed with explicit allowDiskUse set to true:

db.runCommand({
	"aggregate":sourceCollection,
	"pipeline":pipeline,
	allowDiskUse: true,
	cursor:{},
});

The source collection is holding information on page impressions; one such impression document may look like this:

{
	_id: ObjectId("646942e05b40688d6a004b60"),
	site: 416,
	ts: ISODate("2023-05-20T22:00:00.579Z"),
	adblock: 0,
	s: 'g',
	c: 'm',
	a_b: 1,
	a_m: 2,
	a_t: 3,
	au: 14495,
	c_t: 'a',
	c_ap: 1,
	c_t_a: 'h',
	c_ai: 1420172,
	c_t_a_pdt: ISODate("2023-05-20T10:00:00.000Z"),
	c_gi: 216337,
	c_ei: 16678,
	c_et: 217
}

To give some context: This is an impression for site id 416 with a Google-referrer (s:g) from a mobile devive (c:m) with no adblocker active for a content of type article (c_t: a) on an article with the id 1420172, written by an author with the id 14495 and published on c_t_a_pdt; article type was ‘h’, the topic had the id 16678 and a genre-id of 216337 and a topic type of 217; this article would potentially yield one banner ad impression, two medium-rectangle ad impressions, three ad impressions in total.

There are other impression documents which are a bit less complex for non-articles (home page, index pages, forum etc.); there usually are a few thousand up to some 100k such documents per hour; the resulting aggregated collection holds multi-dimensional breakdowns inside one document per hour and site (example below), so next to total impressions and ad impressions there are fields for totals by device, adblocked totals by device, totals by source and content-type, totals by source and article type etc; in addition, each such document holds a couple of arrays containing aggregations for individual articles, topic-ids, topic-type-ids, genre-ids, authors and indexpage-ids; each of these arrays usually holds a few hundred up to a few thousand such documents per hour.

The aggregation pipeline consists of a match-stage, limiting the time span, a project stage, which maps the simple impression documents to the target aggregation form, and a group stage, which processes the mapped documents, simply incrementing most keys, only the nested documents are upserted into their array fields. This group stage is using a custom $accumulator, as I need to deal with the nested arrays and I need to update the current hour with incoming data every few minutes using a subsequent merge phase. The accumulator function is assembled dynamically from a configuration of fields, and while the resulting function is long, it is far from complicated.

While the new aggregation job is running fine as long as only some one or two thousand documents are processed, I get an error from the group stage if it is fed with more than about 10,000 documents. The error, as stated above, is

MongoServerError: PlanExecutor error during aggregation :: caused by :: Out of memory

If I simply “$out” the project stage to a test collection without grouping it, everything is working fine. If I put the out-stage after the group however, I get the error, so it’s not in the merge stage.

A projected document may look like this (abbreviated, the full document is about 500 lines long):

{
  "_id": {
    "$oid": "646942e021893395200d8872"
  },
  "emits": {
    "key": {
      "site": 1,
      "t": {
        "$date": "2023-05-20T22:00:00.000Z"
      }
    },
    "values": {
      "pi": 1,
      "pi_ab": 0,
      "pi_c_m": 1,
      "pi_c_m_ab": 0,
      "pi_c_d": 0,
      "pi_c_d_ab": 0,
      "pi_c_t": 0,
      "pi_c_t_ab": 0,
      "s_d": 0,
      "s_d_ab": 0,
      "s_d_m": 0,
      "s_d_d": 0,
      [... more simple increment fields]
      "art": [
        {
          "id": 1408831,
          "pi": 1,
          "pi_ab": 0,
          "a_t": 3,
          "a_b": 1,
          "a_s": 0,
          "a_m": 2,
          "a_l": 0,
          "a_o": 0,
          "a_v": 0,
          "a_p": 0,
          "c_a_pi": 1,
          "c_a_px": 1,
          "c_a_p1": 1,
          "c_h": 0,
          "c_l": 0,
          "c_a": 1,
          "c_g": 0,
          "c_f": 0,
          "c_b": 0,
          "c_c": 0,
          "c_s": 0,
          "c_o": 0,
          "t": "t",
          "eid": "278622",
          "etid": "217",
          "gid": 157,
          "auid": [
            3037
          ],
          "pubdt": {
            "$date": "2022-12-06T10:00:00.000Z"
          }
        }
      ],
      [... more similar array of object fields]
    }
  }
}

The reduce-function for the $accumulator looks like this; it makes no difference if I omit the processing of the nested keys:

const reduceFunction = function(state, newVal){
	/* 
		we want to use the same reduce function for accumulate and merge
		the latter does not support passing of outside variables/constants
		we therefore need to hardcode the nest configuration unfortunately
		*/
	const nestKeys = ['aut','gen','ett','ent','art','lp'];
	
	/* 
		these are keys inside nested documents that contain values that must not be summed up, such as ids or dates
	*/
	const noSumDocKeys = ['etid','eid','gid','t','auid','pubdt','lt'];

	const newKeys = Object.keys(newVal);
	const sk = newKeys.filter(key => !nestKeys.includes(key));
	const nk = newKeys.filter(key => nestKeys.includes(key));
	// crashes even with nk = []
	
	/* 
		sk -> sumKeys contain the simple values
		*/
	sk.forEach(key =>{
		if (typeof newVal[key] == 'number'){
			if (typeof state[key] === 'undefined'){
				state[key] = newVal[key];
			} else {
				state[key] += newVal[key];
			}
		}
	});

	/* 
		nk -> nestKeys contain arrays of documents that need to be merged
		*/
	nk.forEach(key =>{
		if (typeof state[key] === 'undefined' || !Array.isArray(state[key]) || state[key].length == 0 ){
			/* 
				if state is still empty, we can just assign 
				the whole document as the only element
				*/
			state[key] = newVal[key];
		} else {
			/* 
				we need to merge the two arrays
				*/
			newVal[key].forEach(newDoc =>{
				// check if the old state already contains a document with that id
				const docIndex = state[key].findIndex(obj => obj.id === newDoc.id);
				
				if (docIndex < 0){
					/* 
						new doc is not contained in the old state, so we simply
						add it to the array
						*/
					state[key].push(newDoc);
				} else {
					/* 
						we iterate over the keys of the newDoc
						anything not id and not in noSumDocKeys is summed
						up into the state
						*/
					Object.keys(newDoc).forEach(docKey => {

						const newDocVal = newDoc[docKey];

						if (docKey !== 'id'){
							if (typeof state[key][docIndex][docKey] === 'undefined'){
								// if current state doesn't contain that key, simply set it
								state[key][docIndex][docKey] = newDocVal;
							} else {
								switch(docKey){
									case 'auid':
										// special case: merge two arrays of author ids to a unique array
										state[key][docIndex][docKey] = [...new Set([...state[key][docIndex][docKey], ...newDocVal])];
										break;
									default:
										if (noSumDocKeys.includes(docKey)){
											// simply set the value
											state[key][docIndex][docKey] = newDocVal;
										} else {
											// increment the value
											state[key][docIndex][docKey] += newDocVal;
										}
										break;
								}
							}
						}

					});

				}
			});

		}
	});

	return state;

};

A result document may look like this - I have kept only two nested document in one of the structures, there may be well over 1,000 sub-documents in the “art” array in particular; the total size of one such result document is a little over 1,000 kilobytes.

{
  "_id": {
    "site": 1,
    "t": {
      "$date": "2023-05-20T22:00:00.000Z"
    }
  },
  "value": {
    "pi": 11050,
    "pi_ab": 460,
    "pi_c_m": 9738,
    "pi_c_m_ab": 110,
    "pi_c_d": 1181,
    "pi_c_d_ab": 348,
    "pi_c_t": 131,
    "pi_c_t_ab": 2,
    "s_d": 1200,
    "s_d_ab": 98,
    "s_d_m": 733,
    "s_d_d": 457,
	[... more simple increment fields]
    "aut": [],
    "gen": [],
    "ett": [],
    "ent": [],
    "art": [
      {
        "id": 1419484,
        "pi": 196,
        "pi_ab": 14,
        "a_t": 617,
        "a_b": 182,
        "a_s": 13,
        "a_m": 383,
        "a_l": 13,
        "a_o": 26,
        "a_v": 0,
        "a_p": 0,
        "c_a_pi": 196,
        "c_a_px": 195,
        "c_a_p1": 55,
        "c_h": 0,
        "c_l": 0,
        "c_a": 182,
        "c_g": 14,
        "c_f": 0,
        "c_b": 0,
        "c_c": 0,
        "c_s": 0,
        "c_o": 0,
        "t": "h",
        "eid": "274783",
        "etid": "5115",
        "gid": 142,
        "auid": [
          3037
        ],
        "pubdt": {
          "$date": "2023-05-12T11:27:00.000Z"
        }
      },
      {
        "id": 1420177,
        "pi": 589,
        "pi_ab": 3,
        "a_t": 1828,
        "a_b": 586,
        "a_s": 14,
        "a_m": 1186,
        "a_l": 14,
        "a_o": 28,
        "a_v": 0,
        "a_p": 0,
        "c_a_pi": 589,
        "c_a_px": 581,
        "c_a_p1": 579,
        "c_h": 0,
        "c_l": 0,
        "c_a": 587,
        "c_g": 2,
        "c_f": 0,
        "c_b": 0,
        "c_c": 0,
        "c_s": 0,
        "c_o": 0,
        "t": "n",
        "eid": "25101",
        "etid": "821",
        "auid": [
          16722
        ],
        "pubdt": {
          "$date": "2023-05-20T08:52:00.000Z"
        }
      },
	  [... more article documents]
    ],
    "lp": []
  }
}

… and this is the init-function:

const initFunction = function(sk, nk){
	const initState = {};

	// sumKeys
	sk.forEach(key =>{
		initState[key] = 0;
	});
	// nestKeys
	nk.forEach(key =>{
		initState[key] = [];
	});
	
	return initState;
};

The group-stage looks like this:

pipeline.push( { '$group': {
	_id: "$emits.key",
	value: {
		$accumulator: {
			init: initFunction,
			initArgs: [ sumKeys, nestKeys ],
			accumulate: reduceFunction,
			accumulateArgs: [ "$emits.values" ],
			merge: reduceFunction,
			lang: "js"
		}
	}
}});

Anything that may shed some light onto the issue would help tremendously. I’d have a somewhat bad feeling if the aggregation should some day crash because we may have collected a couple too many impressions in a few minutes. It would also be helpful if the aggregation could simply catch up if it hadn’t been running for a few hours. So far, I have implemented a hard limit of processing just five minutes worth of data at a time. This workaround seems far from elegant though.

Kind regards

Markus

I have managed some optimization towards increasing the number of documents processed in the $group stage without a crash. Before, I couldn’t get it to process even 5,000 documents at a time, now I can manage about 23,000 documents without a crash (though it still crashes somewhere between 25,000 and 30,000 documents).

All I did was replace the setting of a 0 in the $project stage with $$REMOVE, so only fields with a non-zero value would end up in the projection, so I end up with sparse documents streaming into the $group stage, as in the original code most fields would have been assigned a value of 0.

What puzzles me though: I was under the impression that setting allowDiskUse: true would in fact write a temporary result and flush the documents already processed, if memory of the $group ran too close to the limit, and then continue processing the next batch, merging the next result with the temporary on disk until all incoming documents where processed.

This seems to not be the case. If I feed too many documents in the $group stage, it just bombs and there seems to be no surefire way to avoid that, neither some server setting that would allow assigning more memory to certain aggregations nor any way to force the $group to actually use the disk if allowDiskUse is true.

I didn’t use to see this issue with mapReduce in MongoDB 3.6, so having to worry about aggregations failing is definitely not an improvement in my book. Sure, mapReduce took a long time, but I could count on it not failing, even when processing a much larger number of documents, as long as allowDiskUse was enabled.

Could somebody please enlighten me about what’s going wrong here?