Thank you for this post. I was struggling with the same question. Your post steered me in the right direction on how to solve this, as did this post. In my case I need to get the most recent sensor values of a device. The sensor is identified by the ‘name’ field and the device by the thingID field. So aggregate using $match by thingID and $group on name. The records are added with ‘metadata’ set to “thingID” and “name” as was recommended elsewhere. This speeds up the aggregate query by factor 5 or so. (add ‘name’ didn’t make a difference though)
I found that the sort on timestamp worked as expected but using control.max.timestamp didn’t return the most recent record. Also, both have similar performance. Getting the most recent sensors from 100K records with 10 devices and 5 sensor types takes 20msec on my desktop PC, which is a i5-4570S CPU @ 2.90GHz. 300K records takes 63msec, so it looks linear.
My code below. This is in golang. I’m new to mongodb so please forgive the ugliness. It feels like a rather cludgy way to get the results. For example, how to get all fields in the result instead of listing each one using $first?
Creating the collection:
// prepare options
tso := &options.TimeSeriesOptions{
TimeField: "timestamp",
}
tso.SetMetaField("metadata")
tso.SetGranularity("minutes")
co := &options.CreateCollectionOptions{
DefaultIndexOptions: nil,
MaxDocuments: nil,
StorageEngine: nil,
}
co.SetTimeSeriesOptions(tso)
// events
err = srv.storeDB.CreateCollection(ctx, srv.eventCollectionName, co)
Adding an event:
// AddEvent adds a new event to the history store
// The event 'created' field will be used as timestamp after parsing it using time.RFC3339
func (srv *HistoryStoreServer) AddEvent(ctx context.Context, args *thing.ThingValue) (*emptypb.Empty, error) {
// Name and ThingID are required fields
if args.Name == "" || args.ThingID == "" {
err := fmt.Errorf("missing name or thingID")
logrus.Warning(err)
return nil, err
}
if args.Created == "" {
args.Created = time.Now().UTC().Format(time.RFC3339)
}
// It would be nice to simply use bson marshal, but that isn't possible as the
// required timestamp needs to be added in BSON format.
//createdTime, err := time.Parse("2006-01-02T15:04:05-07:00", args.Created)
createdTime, err := time.Parse(time.RFC3339, args.Created)
timestamp := primitive.NewDateTimeFromTime(createdTime)
evBson := bson.M{
TimeStampField: timestamp,
// the metadata on thingID and name speeds up aggregate query by factor 5
"metadata": bson.M{"thingID": args.ThingID, "name": args.Name},
"name": args.Name,
"thingID": args.ThingID,
"valueID": args.ValueID,
"value": args.Value,
"created": args.Created,
"actionID": args.ActionID,
}
res, err := srv.eventCollection.InsertOne(ctx, evBson)
_ = res
return nil, err
}
Get the most recent sensor values of a device:
// GetLatestValues returns the last received event/properties of a Thing
func (srv *HistoryStoreServer) GetLatestValues(ctx context.Context,
args *svc.GetLatest_Args) (*svc.ThingValueMap, error) {
values := &svc.ThingValueMap{PropValues: make(map[string]*thing.ThingValue)}
matchStage := bson.D{
{"$match",
bson.D{
{"thingID", args.ThingID},
},
},
}
sortStage := bson.D{
{"$sort",
bson.D{
{"timestamp", -1},
//{"control.max.timestamp", -1},
},
},
}
groupStage := bson.D{
{"$group",
bson.D{
{"_id", "$name"},
{"name", bson.M{"$first": "$name"}},
{"created", bson.M{"$first": "$created"}},
{"value", bson.M{"$first": "$value"}},
{"valueID", bson.M{"$first": "$valueID"}},
{"thingID", bson.M{"$first": "$thingID"}},
},
},
}
pipeline := mongo.Pipeline{matchStage, sortStage, groupStage}
aggOptions := &options.AggregateOptions{}
cursor, err := srv.eventCollection.Aggregate(ctx, pipeline, aggOptions)
if err != nil {
logrus.Error(err)
return nil, err
}
count := 0
for cursor.Next(ctx) {
value := thing.ThingValue{}
err = cursor.Decode(&value)
values.PropValues[value.Name] = &value
count++
}
logrus.Infof("Iterated %d values", count)
return values, nil
}