I am currently working on a rewrite of MapReduce operations to the aggregation pipeline. The task requires that the result structure should not change at all, as there is quite a lot of code relying on the data structure and it would not be practical to rewrite all of that in a sensible amount of time. My issue with the new job after rewrite to the aggregation pipeline is, that it’s crashing when too many incoming documents are being processed in one run. The error message is
MongoServerError: PlanExecutor error during aggregation :: caused by :: Out of memory
I don’t quite understand what exactly causes this error, neither how to avoid it. I am very sorry for the length of the description, I have already tried to trim down the code examples as much as I can. Here are the details:
The old MapReduce jobs are running fine in production and have done so for years now, though at the moment we’re still on Mongo 3.6; I am running development on MongoDB 6.0.8 however, using Mongosh 1.10.1. The development server has 32GB of RAM and doesn’t do anything but run that one aggregation. The production server (the one still running the original MapReduce job) had 32GB of RAM as well until a fairly recent upgrade, so I know the MR-aggregation can cope with that amount of RAM in the ancient Mongo 3.6.
The jobs are run using mongosh directly on the server.
I haven’t touched the server config much, server is the only member of the replication set:
replication:
oplogSizeMB: 2048
replSetName: rs0
I run
db.adminCommand(
{
setParameter: 1,
allowDiskUseByDefault: true
}
)
immediately before the job and the aggregation command itself is executed with explicit allowDiskUse set to true:
db.runCommand({
"aggregate":sourceCollection,
"pipeline":pipeline,
allowDiskUse: true,
cursor:{},
});
The source collection is holding information on page impressions; one such impression document may look like this:
{
_id: ObjectId("646942e05b40688d6a004b60"),
site: 416,
ts: ISODate("2023-05-20T22:00:00.579Z"),
adblock: 0,
s: 'g',
c: 'm',
a_b: 1,
a_m: 2,
a_t: 3,
au: 14495,
c_t: 'a',
c_ap: 1,
c_t_a: 'h',
c_ai: 1420172,
c_t_a_pdt: ISODate("2023-05-20T10:00:00.000Z"),
c_gi: 216337,
c_ei: 16678,
c_et: 217
}
To give some context: This is an impression for site id 416 with a Google-referrer (s:g) from a mobile devive (c:m) with no adblocker active for a content of type article (c_t: a) on an article with the id 1420172, written by an author with the id 14495 and published on c_t_a_pdt; article type was ‘h’, the topic had the id 16678 and a genre-id of 216337 and a topic type of 217; this article would potentially yield one banner ad impression, two medium-rectangle ad impressions, three ad impressions in total.
There are other impression documents which are a bit less complex for non-articles (home page, index pages, forum etc.); there usually are a few thousand up to some 100k such documents per hour; the resulting aggregated collection holds multi-dimensional breakdowns inside one document per hour and site (example below), so next to total impressions and ad impressions there are fields for totals by device, adblocked totals by device, totals by source and content-type, totals by source and article type etc; in addition, each such document holds a couple of arrays containing aggregations for individual articles, topic-ids, topic-type-ids, genre-ids, authors and indexpage-ids; each of these arrays usually holds a few hundred up to a few thousand such documents per hour.
The aggregation pipeline consists of a match-stage, limiting the time span, a project stage, which maps the simple impression documents to the target aggregation form, and a group stage, which processes the mapped documents, simply incrementing most keys, only the nested documents are upserted into their array fields. This group stage is using a custom $accumulator, as I need to deal with the nested arrays and I need to update the current hour with incoming data every few minutes using a subsequent merge phase. The accumulator function is assembled dynamically from a configuration of fields, and while the resulting function is long, it is far from complicated.
While the new aggregation job is running fine as long as only some one or two thousand documents are processed, I get an error from the group stage if it is fed with more than about 10,000 documents. The error, as stated above, is
MongoServerError: PlanExecutor error during aggregation :: caused by :: Out of memory
If I simply “$out” the project stage to a test collection without grouping it, everything is working fine. If I put the out-stage after the group however, I get the error, so it’s not in the merge stage.
A projected document may look like this (abbreviated, the full document is about 500 lines long):
{
"_id": {
"$oid": "646942e021893395200d8872"
},
"emits": {
"key": {
"site": 1,
"t": {
"$date": "2023-05-20T22:00:00.000Z"
}
},
"values": {
"pi": 1,
"pi_ab": 0,
"pi_c_m": 1,
"pi_c_m_ab": 0,
"pi_c_d": 0,
"pi_c_d_ab": 0,
"pi_c_t": 0,
"pi_c_t_ab": 0,
"s_d": 0,
"s_d_ab": 0,
"s_d_m": 0,
"s_d_d": 0,
[... more simple increment fields]
"art": [
{
"id": 1408831,
"pi": 1,
"pi_ab": 0,
"a_t": 3,
"a_b": 1,
"a_s": 0,
"a_m": 2,
"a_l": 0,
"a_o": 0,
"a_v": 0,
"a_p": 0,
"c_a_pi": 1,
"c_a_px": 1,
"c_a_p1": 1,
"c_h": 0,
"c_l": 0,
"c_a": 1,
"c_g": 0,
"c_f": 0,
"c_b": 0,
"c_c": 0,
"c_s": 0,
"c_o": 0,
"t": "t",
"eid": "278622",
"etid": "217",
"gid": 157,
"auid": [
3037
],
"pubdt": {
"$date": "2022-12-06T10:00:00.000Z"
}
}
],
[... more similar array of object fields]
}
}
}
The reduce-function for the $accumulator looks like this; it makes no difference if I omit the processing of the nested keys:
const reduceFunction = function(state, newVal){
/*
we want to use the same reduce function for accumulate and merge
the latter does not support passing of outside variables/constants
we therefore need to hardcode the nest configuration unfortunately
*/
const nestKeys = ['aut','gen','ett','ent','art','lp'];
/*
these are keys inside nested documents that contain values that must not be summed up, such as ids or dates
*/
const noSumDocKeys = ['etid','eid','gid','t','auid','pubdt','lt'];
const newKeys = Object.keys(newVal);
const sk = newKeys.filter(key => !nestKeys.includes(key));
const nk = newKeys.filter(key => nestKeys.includes(key));
// crashes even with nk = []
/*
sk -> sumKeys contain the simple values
*/
sk.forEach(key =>{
if (typeof newVal[key] == 'number'){
if (typeof state[key] === 'undefined'){
state[key] = newVal[key];
} else {
state[key] += newVal[key];
}
}
});
/*
nk -> nestKeys contain arrays of documents that need to be merged
*/
nk.forEach(key =>{
if (typeof state[key] === 'undefined' || !Array.isArray(state[key]) || state[key].length == 0 ){
/*
if state is still empty, we can just assign
the whole document as the only element
*/
state[key] = newVal[key];
} else {
/*
we need to merge the two arrays
*/
newVal[key].forEach(newDoc =>{
// check if the old state already contains a document with that id
const docIndex = state[key].findIndex(obj => obj.id === newDoc.id);
if (docIndex < 0){
/*
new doc is not contained in the old state, so we simply
add it to the array
*/
state[key].push(newDoc);
} else {
/*
we iterate over the keys of the newDoc
anything not id and not in noSumDocKeys is summed
up into the state
*/
Object.keys(newDoc).forEach(docKey => {
const newDocVal = newDoc[docKey];
if (docKey !== 'id'){
if (typeof state[key][docIndex][docKey] === 'undefined'){
// if current state doesn't contain that key, simply set it
state[key][docIndex][docKey] = newDocVal;
} else {
switch(docKey){
case 'auid':
// special case: merge two arrays of author ids to a unique array
state[key][docIndex][docKey] = [...new Set([...state[key][docIndex][docKey], ...newDocVal])];
break;
default:
if (noSumDocKeys.includes(docKey)){
// simply set the value
state[key][docIndex][docKey] = newDocVal;
} else {
// increment the value
state[key][docIndex][docKey] += newDocVal;
}
break;
}
}
}
});
}
});
}
});
return state;
};
A result document may look like this - I have kept only two nested document in one of the structures, there may be well over 1,000 sub-documents in the “art” array in particular; the total size of one such result document is a little over 1,000 kilobytes.
{
"_id": {
"site": 1,
"t": {
"$date": "2023-05-20T22:00:00.000Z"
}
},
"value": {
"pi": 11050,
"pi_ab": 460,
"pi_c_m": 9738,
"pi_c_m_ab": 110,
"pi_c_d": 1181,
"pi_c_d_ab": 348,
"pi_c_t": 131,
"pi_c_t_ab": 2,
"s_d": 1200,
"s_d_ab": 98,
"s_d_m": 733,
"s_d_d": 457,
[... more simple increment fields]
"aut": [],
"gen": [],
"ett": [],
"ent": [],
"art": [
{
"id": 1419484,
"pi": 196,
"pi_ab": 14,
"a_t": 617,
"a_b": 182,
"a_s": 13,
"a_m": 383,
"a_l": 13,
"a_o": 26,
"a_v": 0,
"a_p": 0,
"c_a_pi": 196,
"c_a_px": 195,
"c_a_p1": 55,
"c_h": 0,
"c_l": 0,
"c_a": 182,
"c_g": 14,
"c_f": 0,
"c_b": 0,
"c_c": 0,
"c_s": 0,
"c_o": 0,
"t": "h",
"eid": "274783",
"etid": "5115",
"gid": 142,
"auid": [
3037
],
"pubdt": {
"$date": "2023-05-12T11:27:00.000Z"
}
},
{
"id": 1420177,
"pi": 589,
"pi_ab": 3,
"a_t": 1828,
"a_b": 586,
"a_s": 14,
"a_m": 1186,
"a_l": 14,
"a_o": 28,
"a_v": 0,
"a_p": 0,
"c_a_pi": 589,
"c_a_px": 581,
"c_a_p1": 579,
"c_h": 0,
"c_l": 0,
"c_a": 587,
"c_g": 2,
"c_f": 0,
"c_b": 0,
"c_c": 0,
"c_s": 0,
"c_o": 0,
"t": "n",
"eid": "25101",
"etid": "821",
"auid": [
16722
],
"pubdt": {
"$date": "2023-05-20T08:52:00.000Z"
}
},
[... more article documents]
],
"lp": []
}
}
… and this is the init-function:
const initFunction = function(sk, nk){
const initState = {};
// sumKeys
sk.forEach(key =>{
initState[key] = 0;
});
// nestKeys
nk.forEach(key =>{
initState[key] = [];
});
return initState;
};
The group-stage looks like this:
pipeline.push( { '$group': {
_id: "$emits.key",
value: {
$accumulator: {
init: initFunction,
initArgs: [ sumKeys, nestKeys ],
accumulate: reduceFunction,
accumulateArgs: [ "$emits.values" ],
merge: reduceFunction,
lang: "js"
}
}
}});
Anything that may shed some light onto the issue would help tremendously. I’d have a somewhat bad feeling if the aggregation should some day crash because we may have collected a couple too many impressions in a few minutes. It would also be helpful if the aggregation could simply catch up if it hadn’t been running for a few hours. So far, I have implemented a hard limit of processing just five minutes worth of data at a time. This workaround seems far from elegant though.
Kind regards
Markus