Need Help with Complex Aggregation: Joining 10 Collections in MongoDB

I’m asking for your help MongoDB experts! First of all, thank you for taking the time to read this.
I encountered the following scenario: I’m doing an aggregation for the MasterCollection collection. I’m “joining” this collection with other 9 collections in the aggregation.
In the end, I’m merging everything into the same MasterCollection. The aggregation execution time took 30 minutes, which is not acceptable. We have a single MongoDb instance with 16GB RAM and we are running it in a docker container.
The MasterCollection has 1015787 documents. The average document size is 1.8kB for the MasterCollection. Additional stats for the collections:
Collection name Number of documents Avg Doc size
collection 1016878 40B
collection2 0 0B
collection3 232 94B
collection4 10289 97B
collection5 10289 97B
collection6 1747 102B
collection 1326 103B
collection8 1016878 42B
collection9 1016878 58B
Compound indexes are created for the fields that are used in the lookups.

My aggregation looks like this:

MasterCollection.aggregate([
  {
    $project: {
      _id: 1,
      field1: 1,
      field2: 1,
      field3: 1,
    },
  },
  {
    $lookup: {
      from: 'collection1',
      localField: '_id',
      foreignField: '_id',
      as: 'collection1',
    },
  },
  {
      $lookup: {
        from: 'collection8',
        localField: '_id',
        foreignField: '_id',
        as: 'collection8',
      },
  },
  {
    $lookup: {
      from: 'collection9',
      localField: '_id',
      foreignField: '_id',
      as: 'collection9',
    },
  },
  {
    $lookup: {
      from: 'collection2',
      let: {
        field1Id: '$field1',
        field2Id: '$field2',
      },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ['$_id.field1', '$$field1Id'] },
                { $eq: ['$_id.field2', '$$field2Id'] },
              ],
            },
          },
        },
        {
          $project: {
            _id: 0,
            fieldFromCollection2: 1,
          },
        },
      ],
      as: 'collection2',
    },
  },
  {
    $lookup: {
      from: 'colelction3',
      let: {
        field1Id: '$field1',
        field2Id: '$field2',
      },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ['$_id.field1', '$$field1Id'] },
                { $eq: ['$_id.field2', '$$field2Id'] },
              ],
            },
          },
        },
        {
          $project: {
            _id: 0,
            fieldFromCollection3: 1,
          },
        },
      ],
      as: 'colelction3',
    },
  },
  {
    $lookup: {
      from: 'collection4',
      let: {
        field1Id: '$field1',
        field2Id: '$field2',
      },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ['$_id.field1', '$$field1Id'] },
                { $eq: ['$_id.field2', '$$field2Id'] },
              ],
            },
          },
        },
        {
          $project: {
            _id: 0,
            fieldFromCollection4: 1,
          },
        },
      ],
      as: 'collection4',
    },
  },
  {
    $lookup: {
      from: 'collection5',
      let: {
        field1Id: '$field1',
        field2Id: '$field2',
      },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ['$_id.field1', '$$field1Id'] },
                { $eq: ['$_id.field2', '$$field2Id'] },
              ],
            },
          },
        },
        {
          $project: {
            _id: 0,
            fieldFromCollection5: 1,
          },
        },
      ],
      as: 'collection5',
    },
  },
  {
    $lookup: {
      from: 'collection6',
      let: {
        field1Id: '$field1',
        field3Id: '$field3',
      },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ['$_id.field1', '$$field1Id'] },
                { $eq: ['$_id.field3', '$$field3Id'] },
              ],
            },
          },
        },
        {
          $project: {
            _id: 0,
            fieldFromCollection6: 1,
          },
        },
      ],
      as: 'collection6',
    },
  },
  {
    $lookup: {
      from: 'collection7',
      let: {
        field1Id: '$field1',
        field2Id: '$field2',
      },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ['$_id.field1', '$$field1Id'] },
                { $eq: ['$_id.field2', '$$field2Id'] },
              ],
            },
          },
        },
        {
          $project: {
            _id: 0,
            fieldFromCollection7: 1,
          },
        },
      ],
      as: 'collection7',
    },
  },      
  { 
    $unwind: // from each collection
  },
  {
    $project: {
      _id: 1,
      // project from each collection
    },
  },
  {
    $merge: {
      into: 'MasterCollection',
      on: '_id',
      whenMatched: 'merge',
      whenNotMatched: 'discard',
    },
  },
], { allowDiskUse: true })

It is indeed not acceptable. Most likely because it looks like a normalized SQL schema. The aggregation also looks like it is redacted from the real thing because all parameters became parafoots when converted from metric to US. (I am pretty sure you have no collection called colelction3). It is misspelled twice as when you do global substitute with the wrong text.

What is the real use case? Is it a migration process to permanently convert from an SQL schema to a better schema?

You system might be starving on memory with that much data in a single pipeline. You could investigate doing each $lookup/$merge in completely independent pipelines of each others. You may then avoid an $unwind for collections where foreignField is _id.

1 Like

You are right, I introduced some typos and the original collections have different names :slight_smile:

It is not a migration process. We have an ETL application and most of the data in the other collections are different counts by different groupings.

As far as I recall the team already tried the $lookup/$merge in completely independent pipelines and the execution took much more time.

Get better hardware. More RAM, faster disk.

Have a better schema. Move what ever is stored dynamically in secondary collection into MasterCollection so that you do not have to $lookup and then $merge.

Try removing the first $project.

For collections where you $lookup with foreignField:_id, reverse the process. Aggregate the secondary collection and then $merge like:

collection1.aggregate( [ { "$merge" : {
  "into" : "MasterCollection" , ...

You save on some $lookup and most likely you can avoid allowDiskUse for those.

Most likely you could also do the same with other collections by using:

on : [ "field1" , "field2" ]

But field1 and field2 would need to at the top level rather than within _id in secondary collections which you could do easily with a $set or $project.

I expect that starting the aggregate() on secondary to cut time in half as the number of $lookup is halved and in all cases disk would not be used.

Hi @Gergo_Boros, is there any followup on this?

ad Thanks vance

Hi @steevej, I managed to reduce the number of joins and updated the $match conditions so the id index can be used where I’m matching by multiple fields:

{ $eq: ['$_id.field1', '$$field1Id'] },
{ $eq: ['$_id.field2', '$$field2Id'] },

The performance improvement was 10 minutes, from ~30 mins to ~20 mins. However this is still a lot, I moved the same data to PostgreSQL, and without any major optimizations, I managed to run the same query with the same data in under 9 seconds.

Initially, this query was split into multiple queries, where we aggregated the secondary collections as you suggested. I did not implement that part, but I recall the execution time was around 2-3 hours.