Getting most recent documents in time series collection

Hi all,

we use a time series collection and need to regularly collect and display the first 50, 100, 250 documents of a device. Individual entries must be unique and updatable, so duplicates must always be filtered out (using a group). I’m new to mongo and stuck at calculating the document count.

I have two ideas:

  1. Aggregate the documents via the system.bucket and use its control.count field to determine the current document count. Then break as soon as the desired limit is reached. Yet, I don’t know how to do this dynamically and more important directly in the aggregation. In fact, I currently just guess the $limit.

  2. Use the time series collection (not the internal bucket collection) and limit documents using a time window. Yet, I don’t know how to dynamically increase the window in the aggregation.

Here are two examples.
Example aggregation using the time series collection:

const cursor = await db.collection(test).aggregate([
    {
      $match: {
        "meta.ti": ti,
        "meta.type": "P",
        ts: {
          $lte: new Date("2022-05-27T16:40:29.000Z"),
          $gte: new Date("2022-05-09T06:10:22.000Z"),
        },
      },
    },
    {
      $group: {
        _id: {
          ti: "$meta.ti",
          trace: "$trace",
          ts: {
            $dateTrunc: {
              date: "$ts",
              unit: "day",
              startOfWeek: "monday",
              binSize: 1,
            },
          },
        },
        docs: {
          $topN: {
            output: "$$ROOT",
            sortBy: {
              _id: 1,
            },
            n: 1,
          },
        },
      },
    },
    {
      $sort: {
        "docs.ts": -1,
      },
    },
  ]);

Example aggregation using the system.bucket collection:

const cursor = await db.collection(system.buckets.test).aggregate([
    {
      $match: {
        "meta.ti": ti,
        "meta.type": "P",
      },
    },
    { $limit: 2 },
    {
      $_internalUnpackBucket: {
        timeField: "ts",
        metaField: "meta",
        bucketMaxSpanSeconds: 3600,
      },
    },
    {
      $group: {
        _id: {
          ti: "$meta.ti",
          trace: "$trace",
          ts: {
            $dateTrunc: {
              date: "$ts",
              unit: "day",
              startOfWeek: "monday",
              binSize: 1,
            },
          },
        },
        docs: {
          $topN: {
            output: "$$ROOT",
            sortBy: {
              _id: 1,
            },
            n: 1,
          },
        },
      },
    },
    {
      $sort: { "docs.ts": -1 },
    },
  ]);

Devices provide data points with different insertion rates (i.e. some devices insert within seconds and others in days to weeks). So for some devices I will scan too little for others too much documents if I just guess the time span or the limit.

Is it possible in both scenarios to avoid this under and over fetching? Is there a smarter way to approach this problem?

Thanks,
Ben

Hi @Benjamin_Behringer,

Regarding your approach in 1., I would perhaps avoid this scenario as this is an implementation detail and it might change in future MongoDB versions as the feature is improved.

To get a better idea of what you’re trying to achieve with the second approach, would you be able to provide the following information:

  1. MongoDB Version in use.
  2. Sample document(s).
  3. Expected output.
  4. “Yet, I don’t know how to dynamically increase the window in the aggregation.” - Clarification on this and if you mean increasing the lower / upper bound of the time range in which documents are searched for within. Would you be able to provide an example on this scenario as well?
  5. “so duplicates must always be filtered out” - How are possible duplicates created and do you see this often enough that it affects your results?

Regards,
Jason

1 Like

Hi Jason,

Thanks for your response. Here are the details:

  1. MongoDB Version in use.

MongoDB Version 6.0.0-rc7 (Atlas)

  1. Sample document(s).

Here’s an example with infrequent data.

{
  "ts": ISODate('2022-05-27T12:29:37.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3383,
  "_id": ObjectId("62b790d8510c3407edfb7434")
},
{
  "ts": ISODate('2022-04-20T11:24:48.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3382,
  "_id": ObjectId("62b790c8510c3407edfb092d")
},
{
  "ts": ISODate('2022-03-25T14:56:28.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3381,
  "_id": ObjectId("62b790bc510c3407edfab8bd")
},
{
  "ts": ISODate('2022-03-25T14:51:38.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3380,
  "_id": ObjectId("62b790bc510c3407edfab8bf")
},
{
  "ts": ISODate('2022-03-25T14:51:14.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3380,
  "_id": ObjectId("62b790bc510c3407edfab8c3")
},
{
  "ts": ISODate('2022-02-26T10:32:37.000+00:00'),
  "meta": { "ti": 1 },
  "receipt": 3379,
  "_id": ObjectId("62b790b1510c3407edfa720b")
}
  1. Expected output.

I want to be able to say “give me the last 2 documents”, which gives me:

{
  "ts": ISODate('2022-05-27T12:29:37.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3383,
  "_id": ObjectId("62b790d8510c3407edfb7434")
},
{
  "ts": ISODate('2022-04-20T11:24:48.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3382,
  "_id": ObjectId("62b790c8510c3407edfb092d")
}

Or the last 5, which gives me:

{
  "ts": ISODate('2022-05-27T12:29:37.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3383,
  "_id": ObjectId("62b790d8510c3407edfb7434")
},
{
  "ts": ISODate('2022-04-20T11:24:48.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3382,
  "_id": ObjectId("62b790c8510c3407edfb092d")
},
{
  "ts": ISODate('2022-03-25T14:56:28.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3381,
  "_id": ObjectId("62b790bc510c3407edfab8bd")
},
{
  "ts": ISODate('2022-03-25T14:51:38.000+00:00'),
  "meta": { "ti": 1 },
  "trace": 3380,
  "_id": ObjectId("62b790bc510c3407edfab8bf")
},
{
  "ts": ISODate('2022-02-26T10:32:37.000+00:00'),
  "meta": { "ti": 1 },
  "receipt": 3379,
  "_id": ObjectId("62b790b1510c3407edfa720b")
}

Note that trace 3380 is duplicated in the collection and thus only the most recent document with this trace is valid and printed (cf. $group in the first post).

  1. “Yet, I don’t know how to dynamically increase the window in the aggregation.” - Clarification on this and if you mean increasing the lower / upper bound of the time range in which documents are searched for within. Would you be able to provide an example on this scenario as well?

Let’s use the example above again. If I want to collect the “last two” transactions, I need at least:

$match: {
        "meta.ti": 1,
        ts: {
          $gte: ISODate("2022-04-20T11:24:48.000+00:00"),
        },
      },

Some devices (identified by meta.ti) have a higher insertion frequency, e.g., hundreds of transactions a day. So just using time in the $match to retrieve documents leads to over-fetching or under-fetching.

  1. “so duplicates must always be filtered out” - How are possible duplicates created and do you see this often enough that it affects your results?

In a sample of 50.000 documents 119 are duplicates. Since documents track financial transactions from multiple legacy systems, it would indeed affect results :slight_smile: In particular, each transaction has a trace number. This trace can be duplicated indicating that the prior transaction with this trace failed (cf. example). Thus, only the transaction with the latest transaction time and trace pair is valid. Since we don’t care about failed transactions, we remove duplicates already while inserting, yet it is not guaranteed that this way all duplicated traces are identified. It would be great if we could change the status of a transaction, but updating fields seems to be not possible with time series currently.

Hope this helps to understand the problem.

Thanks
Ben

1 Like

Hi @Benjamin_Behringer - Thank you for providing the detailed response :slight_smile:

I want to be able to say “give me the last 2 documents”, which gives me:

I’ve come up with a possible working aggregation which appears to provide an output similar to your expected output:

tsdb>db.collection.aggregate(
{$match:{"meta.ti":1}},
{$group:{_id:{trace:"$trace"},ts:{$max:"$ts"},meta:{"$max":"$meta"}}},
{$group:{_id:{ti:"$meta.ti"},data: { '$topN': { output: '$$ROOT', sortBy: { ts: -1 }, n: 2 } }}})
[
  {
    _id: { ti: 1 },
    data: [
      {
        _id: { trace: 1962 },
        ts: ISODate("2022-12-30T13:42:10.017Z"),
        meta: { ti: 1 }
      },
      {
        _id: { trace: 4725 },
        ts: ISODate("2022-12-28T10:40:41.485Z"),
        meta: { ti: 1 }
      }
    ]
  }
]

Please note that I have a the following indexes on my test environment:

  • {"meta.ti":1}
  • {"ts":1}

Note that trace 3380 is duplicated in the collection and thus only the most recent document with this trace is valid and printed (cf. $group in the first post).

There are multiple $group stages mentioned above, the first $group stage is aimed to remove duplicate entries with the same trace value taking the latest ts value. As they are duplicates, I presume the meta field data would be the same so I have just used the $max accumulator on this field. However, correct me if I am wrong here.

The second $group is similar to what you had advised in the initial post but with n set to the value of the amount of documents per device you want returned and sorted by ts descending.

Since the operation is attempting to achieve a result that includes “clean” data whilst querying, there will be a hit to performance which I understand is not ideal. In saying so, if you need this query to be more performant, perhaps you could consider a separate process that can filter/remove the duplicates before querying the Top-N documents from it.

Running the same aggregation using a test collection containing duplicate documents with the trace field value of 3380 (duplicate):

tsdb>db.collection.aggregate(
{$match:{"meta.ti":1}},
{$group:{_id:{trace:"$trace"},ts:{$max:"$ts"},meta:{"$max":"$meta"}}},
{$group:{_id:{ti:"$meta.ti"},data: { '$topN': { output: '$$ROOT', sortBy: { ts: -1 }, n: 2 } }}})
[
  {
    _id: { ti: 1 },
    data: [
      {
        _id: { trace: 3380 },
        ts: ISODate("2022-03-25T14:51:38.000Z"),
        meta: { ti: 1 }
      }
    ]
  }
]

If you are to use this aggregation, it is highly recommend to extensively test and verify it suits all your use case requirements in a test environment first

Going back to the duplicate documents, how often does this occur? While a timeseries collection is great for ingesting the data type you have, have you considered using a regular collection with a unique index that can deal with the duplicate values as it is being inserted?

but updating fields seems to be not possible with time series currently.

With regards to the above and starting in MongoDB version 5.0.5, you can perform some delete and update operations but with the requirements listed in the Time Series Collections Limitations - Updates and Deletes documentation.

In addition to the above, the Indexes documentation for timeseries collection may be of use to you.

Hope this helps.

Regards,
Jason

3 Likes

Hi Jason,

Wow, thanks for your great help! Really appreciate it :slight_smile:

I tested your approach. Unfortunately, performance depends on the maximum number of documents as internalUnpackBucket needs to unpack all documents and subsequent grouping is performed on all of them. So this wouldn’t scale well? Moreover, the sort operation might fail eventually? Below is an example with 12108 transactions to be unpacked and grouped. We retrieved the last 200 transactions.

Using the approach in my first post is significantly faster, since we need to unpack the required documents only. In the given time frame the last 453 documents have been retrieved, grouped and sorted in 12 ms. Note that Compass used the index, but didn’t indicate it. However, we would under fetch or over fetch and would need to use limit to cap at 200 documents…

This is why I thought just going through the system.bucket might help, but that doesn’t feel right and comes with the drawbacks you mentioned.

Some more questions:

  1. How would you realize a separate process for filtering?
  2. How would you implement an aggregation to get the next 200 transactions in your approach (i.e., get 200, the next 200, the next next 200, …)?

Going back to the duplicate documents, how often does this occur?

~0,23% of documents are duplicates in our sample (50k docs)

have you considered using a regular collection with a unique index that can deal with the duplicate values as it is being inserted?

Yes, but time series feel natural for this problem, since it takes the burden of creating buckets, efficiently storing the data, enabling nice queries, … off our shoulders.

With regards to the above and starting in MongoDB version 5.0.5, you can perform some delete and update operations

Unfortunately, queries can be only performed on the meta fields, which doesn’t help in our scenario.

Thanks,
Ben

1 Like

Tested and solved for us with 6.0.0-rc13 :slight_smile: Just need to $match, $sort and $limit before doing anything else. This way mongo limits the number of buckets to be unpacked and does not unpack all buckets anymore. For instance:

[
  {
      $match: {
          'meta.ti': 1,
          'meta.type': 'P'
      }
  }, {
      $sort: {
          ts: -1
      }
  }, {
      $limit: 50
  }, 
  ...
]

And if you want to retrieve the next 50 you just use the timestamp of the last element:

[
  {
      $match: {
          'meta.ti': 1,
          'meta.type': 'P'
          "ts" : {$lt: ISODate('2022-07-07T09:59:42.743Z')}
      }
  }, {
      $sort: {
          ts: -1
      }
  }, {
      $limit: 50
  }, 
  ...
]

Using an index on the necessary meta fields makes this blazing fast :slight_smile:

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.