Aggregation Pipeline Distribute Documents into Arrays

Hi,

I am trying to write an aggregation pipeline to aggregate values in an array. Our Datamodel looks something like

{
  "_id": {
    "$oid": "64e89dd304c2da1a0022b0d6"
  },
  "size": 2, //number of Documents in data

  "version": "abc_Unaggregated",
  "data": [
    {
      "request": {
        "a" : "x",
        "b" : "y",
        "c" : "z"
      }
    },
    {
       "request": {
        "a" : "x",
        "b" : "y",
        "c" : "z"
      }
    }
  ]
}

The size of the data array is limited to 100 entries per document. We have about 1,5 billion of those entries. From which about 10% are really different. So the idea is to aggregate the different requests into one. In the example above instead of the 2 requests there would be one left.

The genral way I did it is to create an Aggregation-Pipeline with the following steps:

  1. match documents for the correct version (“abc_Unaggregated”)
  2. unwind data
  3. group by {data.request.a , data.request.b, data.request.c, version}
  4. change version to unaggregated (“abc”)
  5. projection to get the right data structure
  6. group with push to new data array
[
  {
    $match:
      /**
       * query: The query in MQL.
       */
      {
        version: "abc_Unaggregated",
      },
  },
  {
    $unwind:
      /**
       * path: Path to the array field.
       * includeArrayIndex: Optional name for index.
       * preserveNullAndEmptyArrays: Optional
       *   toggle to unwind null and empty values.
       */
      {
        path: "$data",
      },
  },
  {
    $group:
      /**
       * _id: The id of the group.
       * fieldN: The first field name.
       */
      {
        _id: {
          a: "$data.request.a",
          b: "$data.request.b",
          c: "$data.request.c"
          version: "$version",
        },
      },
  },
  {
    $set:
      /**
       * field: The field name
       * expression: The expression.
       */
      {
        "_id.version": {
          $replaceOne: {
            input: "$_id.version",
            find: "_Unaggregated",
            replacement: "",
          },
        },
      },
  },
  {
    $project:
      /**
       * _id: The id of the group.
       * fieldN: The first field name.
       */
      {
        "placeholder.request": {
          a: "$_id.a",
          b: "$_id.b",
          c: "$_id.c"
        },
        version: "$_id.version",
      },
  },
  {
    $group:
      /**
       * _id: The id of the group.
       * fieldN: The first field name.
       */
      {
        _id: {
          /**
           * needed to set new package
           */
          version: "$version",
        },
        data: {
          $push: "$placeholder",
        },
      },
  }
]

The problem is here the data array would be 1,5 million or more entries large. That is way to much for a single document. Is there a way to distribute the push to different documents so that the end result will be multiple documents each containig a data array with 100 entries?

grafik

I tried slice, limit and bucket. All of these don’t seem to be applicable to my problem.
limit only cuts away instead of distributing the documents.
Slice seems to do the same: i can decrease the number of elements but not distribute them.
With buckets I would need to give boundaries, to miraculously get a size of 100 per bucket, which would call for variable boundaries so not really applicable to this problem.

There are some more steps e.g. for merging back into the collection but they are not relevant to my question.

We are using an Atlas Mongo DB version 5.0.19

Hello, @Philipp_Reichling ! Welcome to the community :wave:

Idea
In your case I can suggest to change the flow a bit:

  1. Mark-select documents to be processed (update with updateMany() method).
  2. Get unique requests list (aggregation pipeline with $merge).
  3. Update version of processed documents (update with updateMany() method).

To demonstrate the idea, I would take your dataset example and extend it a bit, like so:

db.requests.insertMany([
  {
    _id: 'R1',
    size: 2,
    // possible values: 'justAdded', 'toProcess', 'processed'
    version: 'justAdded', 
    data: [
      {
        request: { // unique (1)
          a: 'x',
          b: 'y',
          c: 'z'
        }
      },
      {
         request: {
          a: 'x',
          b: 'y',
          c: 'z'
        }
      }
    ]
  },
  {
    _id: 'R2',
    size: 3,
    version: 'justAdded',
    data: [
      {
        request: {
          a: 'k', // new, unique (2)
          b: 'y',
          c: 'z'
        }
      },
      {
        request: {
          a: 'm', // new, unique (3)
          b: 'y',
          c: 'z'
        }
      },
      {
         request: {
          a: 'm', // new
          b: 'y',
          c: 'z'
        }
      }
    ]
  }
]);

I assume, that your documents have versioning process like this:

  • justAdded - initial stage of documents. request objects from data array will not be considered processed. All documents start with this stage (or version).
  • toProcess - documents, that will be processed
  • processed - documents, that are already processed

You can adapt this to have 0,1,2 numbering instead of textual stages or similar ones. You can add a separate field in your documents to support suggested flow, if needed.

1. Select-mark documents to process.

This is needed, so the version-update operation would know what documents to update after processing-aggregation is run.

db.requests.updateMany(
  {
    version: 'justAdded'
  },
  {
    $set: {
      version: 'toProcess'
    }
  }
);

2. Process documents
This will write unique requests into a separate collection - requestsProcessed. It will be populated with next runs of the processing-aggregation. Form this collection you can get the list of unique request objects.

db.requests.aggregate([
  {
    $match: {
      version: 'toProcess'
    }
  },
  {
    $unwind: '$data',
  },
  {
    $project: {
      _id: '$data.request'
    }
  },
  { 
    $merge: {
      into: 'requestsProcessed',
      on: '_id',
      whenMatched: 'keepExisting',
      whenNotMatched: 'insert'
    }
  }
]);

Unique documents in the requestsProcessed collection:

[
  { _id: { a: 'x', b: 'y', c: 'z' } },
  { _id: { a: 'k', b: 'y', c: 'z' } },
  { _id: { a: 'm', b: 'y', c: 'z' } }
]

3. Update version of processed documents
This way you avoid document to be processed more than once.

db.requests.updateMany(
  {
    version: 'toProcess'
  },
  {
    $set: {
      version: 'processed'
    }
  }
);

Let me know, if this solution works for you :wink:

1 Like