Slow $group or $set stage of aggregation pipeline

Hello,
recently we’ve observed that one of our queries slowed down, I made a change, it helped a little, but I wanted to ask whether we can make any improvements in the aggregation pipeline query in the future.

Collection stores the documents from many externalIds. Each document have a coll which contains the idObject and lastUpdated objects.

The logic behind the query that executes is that we want to get the lastUpdated date for each idObject._id within all externalId documents.

The query first looked like that:

db.getCollection("CollectionOne").aggregate(
    [
        { $match: { 'externalId': <externalId>, 'coll.idObject._id': { $in: <array of IDs> } } },
        { $set : { 'coll' : { $filter : { 'input' : '$coll', as : 'collObject', cond : { $in : [ '$$collObject.idObject_id', <array of IDs> ] } } } } },
        { $unwind : '$coll' },
        { $group : { '_id' : '$coll.idObject._id', 'lastUpdated' : { $max : '$coll.lastUpdated' } } }
    ]
)

The params that are used as filters are the externalId and the array of IDs.

Example document looks like that:

{
    ...
    "externalId": "1"
    ...
    "coll": [
        {
            "idObject": {
                "_id": "12345678901"
            },
            "lastUpdated": { // object because it's java `OffsetDateTime` that have to be mapped manually as two fields
                "dateTime": "2023-05-18T00:47:00.000+00:00", // it's Date type
                "offset": "Z"
            },
            ...
        }
    ]
}

Normally we execute the queries with the array of parameters around 1000.
Collection size now looks like that:

STORAGE SIZE: 500.9MB
LOGICAL DATA SIZE: 2.04GB
TOTAL DOCUMENTS: 868 724
INDEXES TOTAL SIZE: 130.92MB

The queries started to take 2-4 seconds recently and we needed to make it a little bit faster.

Index that exists in CollectionOne is:

IDX_ONE: { externalId: 1, coll.idObject._id: 1 },
IDX_TWO: { coll.idObject._id: 1 }

Output for one of longer queries in Mongo Atlas profiler page:

{
    "type": "command",
    "ns": "not_relevant",
    "command": {
      "aggregate": "CollectionOne",
      "pipeline": [
        {
          "$match": {
            "externalId": 1,
            "coll.idObject._id": {
              "$in": [
                // 1000 objects
              ]
            }
          }
        }
      ]
    },
    "planSummary": "IXSCAN { externalId: 1, coll.idObject._id: 1 }",
    "cursorid": 4581154899614066000,
    "keysExamined": 72131,
    "docsExamined": 456,
    "fromMultiPlanner": true,
    "replanned": true,
    "replanReason": "cached plan was less efficient than expected: expected trial execution to take 99 works but it took at least 990 works",
    "numYields": 0,
    "nreturned": 32,
    "queryHash": "EF6EF69E",
    "planCacheKey": "EFF6500A",
    "queryFramework": "classic",
    "reslen": 2796,
    "locks": {},
    "readConcern": {
      "level": "local",
      "provenance": "implicitDefault"
    },
    "storage": {
      "data": {
        "bytesRead": 73979545,
        "timeReadingMicros": 492304
      }
    },
    "remote": "xxx",
    "protocol": "op_msg",
    "durationMillis": 3808,
    "v": "6.0.5"
  }

First I was blaming the $group stage as when I deleted it, the query to the $unwind step was pretty fast.
Later I decided to try to delete the $set step, and it increased the time significantly.
Now the query looks like below, I get more results than with $set aggregation step, but the final filtering of returned results is made in Java code now.

db.getCollection("CollectionOne").aggregate(
    [
        { $match: { 'externalId': <externalId>, 'coll.idObject_id': { $in: <array of IDs> } } },
        { $unwind : '$coll' },
        { $group : { '_id' : '$coll.idObject._id', 'lastUpdated' : { $max : '$coll.lastUpdated' } } },
    ]
)

And the explain("executionStats") for above:

{
  explainVersion: '1',
  stages: [
    {
      '$cursor': {
        queryPlanner: {
          namespace: 'prod.CollectionOne',
          indexFilterSet: false,
          parsedQuery: {
            '$and': [
              { externalId: { '$eq': 1 } },
              {
                'coll.idObject._id': {
                  '$in': [
                    // ids
                  ]
                }
              }
            ]
          },
          queryHash: 'EF6EF69E',
          planCacheKey: 'EFF6500A',
          maxIndexedOrSolutionsReached: false,
          maxIndexedAndSolutionsReached: false,
          maxScansToExplodeReached: false,
          winningPlan: {
            stage: 'PROJECTION_SIMPLE',
            transformBy: { coll: 1, _id: 0 },
            inputStage: {
              stage: 'FETCH',
              inputStage: {
                stage: 'IXSCAN',
                keyPattern: { externalId: 1, 'coll.idObject._id': 1 },
                indexName: 'IDX_ONE',
                isMultiKey: true,
                multiKeyPaths: {
                  externalId: [],
                  'coll.idObject._id': [ 'coll' ]
                },
                isUnique: false,
                isSparse: false,
                isPartial: false,
                indexVersion: 2,
                direction: 'forward',
                indexBounds: {
                  externalId: [ '[1, 1]' ],
                  'coll.idObject._id': [
                    // ids
                  ]
                }
              }
            }
          },
          rejectedPlans: [
            {
              stage: 'PROJECTION_SIMPLE',
              transformBy: { coll: 1, _id: 0 },
              inputStage: {
                stage: 'FETCH',
                filter: {
                  'coll.idObject._id': {
                    '$in': [
                      //ids
                    ]
                  }
                },
                inputStage: {
                  stage: 'IXSCAN',
                  keyPattern: { externalId: 1, 'coll.idObject._id': 1 },
                  indexName: 'IDX_ONE',
                  isMultiKey: true,
                  multiKeyPaths: {
                    externalId: [],
                    'coll.idObject._id': [ 'coll' ]
                  },
                  isUnique: false,
                  isSparse: false,
                  isPartial: false,
                  indexVersion: 2,
                  direction: 'forward',
                  indexBounds: {
                    externalId: [ '[1, 1]' ],
                    'coll.idObject._id': [ '[MinKey, MaxKey]' ]
                  }
                }
              }
            },
            {
              stage: 'PROJECTION_SIMPLE',
              transformBy: { coll: 1, _id: 0 },
              inputStage: {
                stage: 'FETCH',
                filter: { externalId: { '$eq': 1 } },
                inputStage: {
                  stage: 'IXSCAN',
                  keyPattern: { 'coll.idObject._id': 1 },
                  indexName: 'IDX_TWO',
                  isMultiKey: true,
                  multiKeyPaths: { 'coll.idObject._id': [ 'coll' ] },
                  isUnique: false,
                  isSparse: false,
                  isPartial: false,
                  indexVersion: 2,
                  direction: 'forward',
                  indexBounds: {
                    'coll.idObject._id': [
                      // ids
                    ]
                  }
                }
              }
            }
          ]
        },
        executionStats: {
          executionSuccess: true,
          nReturned: 350,
          executionTimeMillis: 146,
          totalKeysExamined: 51835,
          totalDocsExamined: 350,
          executionStages: {
            stage: 'PROJECTION_SIMPLE',
            nReturned: 350,
            executionTimeMillisEstimate: 14,
            works: 51835,
            advanced: 350,
            needTime: 51484,
            needYield: 0,
            saveState: 61,
            restoreState: 61,
            isEOF: 1,
            transformBy: { coll: 1, _id: 0 },
            inputStage: {
              stage: 'FETCH',
              nReturned: 350,
              executionTimeMillisEstimate: 14,
              works: 51835,
              advanced: 350,
              needTime: 51484,
              needYield: 0,
              saveState: 61,
              restoreState: 61,
              isEOF: 1,
              docsExamined: 350,
              alreadyHasObj: 0,
              inputStage: {
                stage: 'IXSCAN',
                nReturned: 350,
                executionTimeMillisEstimate: 4,
                works: 51835,
                advanced: 350,
                needTime: 51484,
                needYield: 0,
                saveState: 61,
                restoreState: 61,
                isEOF: 1,
                keyPattern: { externalId: 1, 'coll.idObject._id': 1 },
                indexName: 'IDX_ONE',
                isMultiKey: true,
                multiKeyPaths: {
                  externalId: [],
                  'coll.idObject._id': [ 'coll' ]
                },
                isUnique: false,
                isSparse: false,
                isPartial: false,
                indexVersion: 2,
                direction: 'forward',
                indexBounds: {
                  externalId: [ '[1, 1]' ],
                  'coll.idObject._id': [
                    // ids
                  ]
                },
                keysExamined: 51835,
                seeks: 687,
                dupsTested: 51148,
                dupsDropped: 50798
              }
            }
          }
        }
      },
      nReturned: Long("350"),
      executionTimeMillisEstimate: Long("53")
    },
    {
      '$unwind': { path: '$coll' },
      nReturned: Long("70000"),
      executionTimeMillisEstimate: Long("63")
    },
    {
      '$group': {
        _id: '$coll.idObject._id',
        lastUpdated: { '$max': '$coll.lastUpdated' }
      },
      maxAccumulatorMemoryUsageBytes: { lastUpdated: Long("367714") },
      totalOutputDataSizeBytes: Long("575140"),
      usedDisk: false,
      spills: Long("0"),
      nReturned: Long("1146"),
      executionTimeMillisEstimate: Long("126")
    }
  ],
  serverInfo: {
    host: 'xxx',
    port: 12345,
    version: '6.0.5',
    gitVersion: 'xxx'
  },
  serverParameters: {
    internalQueryFacetBufferSizeBytes: 104857600,
    internalQueryFacetMaxOutputDocSizeBytes: 104857600,
    internalLookupStageIntermediateDocumentMaxSizeBytes: 104857600,
    internalDocumentSourceGroupMaxMemoryBytes: 104857600,
    internalQueryMaxBlockingSortMemoryUsageBytes: 104857600,
    internalQueryProhibitBlockingMergeOnMongoS: 0,
    internalQueryMaxAddToSetBytes: 104857600,
    internalDocumentSourceSetWindowFieldsMaxMemoryBytes: 104857600
  },
  command: {
    aggregate: 'CollectionOne',
    pipeline: [
      {
        '$match': {
          externalId: 1,
          'coll.idObject._id': {
            '$in': [
              // ids
            ]
          }
        }
      },
      { '$unwind': '$coll' },
      {
        '$group': {
          _id: '$coll.idObject._id',
          lastUpdated: { '$max': '$coll.lastUpdated' }
        }
      }
    ],
    cursor: {},
    '$db': 'prod'
  },
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1684360973, i: 1 }),
    signature: {
      hash: Binary(Buffer.from("xxx", "hex"), 0),
      keyId: Long("xxx")
    }
  },
  operationTime: Timestamp({ t: 1684360973, i: 1 })
}

If you would like to see the explain for the first query I can provide it but it will be in the afternoon :slight_smile:

Do anybody has any suggestions here?

Complicated use-case take more time and most of us here are not paid to do that while you probably are by your employer. So sometimes you need to be patient.

1 - Doing $in on 1000 element is probably expensive no matter what
2 - If this is a very frequent use-case, it might be worth while to keep lastUpdated per idObject in a separate collection, some kind of computed pattern
3 - Do you really need the object idObject that holds the single field _id?
4 - It might help simplifying the $filter to

{ 'input' : '$coll.idObject._id', as : 'id', cond : { $in : [ '$$id', <array of IDs> ] }

Hello Steeve!
I’m not pushing you to answer, just trying to find a solution for that, and wanted to refresh that topic after a month :slight_smile:

I was thinking about it, as lowering it up to 100 elements fastens up the query a little bit, but running it 10 times makes it similar in elapsed execution time.

That’s good idea.
Everything I do up to the $group stage is working extremely fast. Sometimes, after the penultimate step I have up to 120k records, and then we have a grouping which takes 10 seconds.
I thought that maybe grouping it in Java would be more efficient.

The main idea behind that query is to get lastUpdated for externalId grouped by _id.
Maybe keeping it in different collection will be that I’m looking for. Or I will find another solution to get that information and keep it updated.

It keeps the ID type information along with the ID in string, I have not mentioned it in example object body, sorry.

I will give it a go! :slight_smile:

I also tried to get that information without filtering, it is around 1kk documents to scan for some externalId before the $group stage, and it takes up to 10 seconds to group it by lastUpdated.
Getting the documents is ~0.1/2 sec per externalId cause it is using index.
After that, grouping slows down significantly. Thank you for your ideas, I’ll try something tomorrow.