@Bradley_Benjamin, here is the aggregation query and the Golang version of it. You will notice that I have made some refinements in the query. The outputs are like I had mentioned in the earlier comments.
The Aggregation query:
db.test.aggregate([
{
$sort: { trading_pair: 1, time_stamp: -1 }
},
{
$group: {
_id: "$trading_pair",
docs: { $push: "$$ROOT" },
latest_time_stamp: { $first: "$time_stamp" },
latest_price: { $first: "$price" }
}
},
{
$addFields: {
prev_doc: {
$let: {
vars: {
filtered: {
$filter: {
input: "$docs",
cond: {
$eq: [
{ $dateTrunc: { date: "$$this.time_stamp", unit: "second", binSize: 60 } },
{ $subtract: [
{ $dateTrunc: { date: "$latest_time_stamp", unit: "second", binSize: 60 } },
300000
] }
]
}
}
}
},
in: {
$cond: [
{ $eq: [ { $size: "$$filtered" }, 0 ] },
{ time_stamp: "none", price: "none" },
{ $arrayElemAt: [ "$$filtered", 0 ] }
]
}
}
}
}
},
{
$project: {
trading_pair: "$_id",
_id: 0,
latest_time_stamp: 1,
latest_price: 1,
prev_time_stamp: "$prev_doc.time_stamp",
prev_price: "$prev_doc.price",
percent_change: {
$cond: [
{ $eq: [ "$prev_doc.price", "none" ] },
"none",
{
$divide: [
{ $multiply: [
{ $subtract: [ "$latest_price", "$prev_doc.price" ] },
100
] },
"$prev_doc.price"
]
}
]
}
}
}
])
The Golang code:
period := 300000 // 5 * 60 * 1000 = 5 mins, in milli-seconds
sortStage := bson.D{{"$sort", bson.D{{"trading_pair", 1}, {"time_stamp", -1}}}}
groupStage := bson.D{{"$group", bson.D{{"_id","$trading_pair"}, {"latest_time_stamp", bson.D{{"$first", "$time_stamp"}}}, {"latest_price", bson.D{{"$first", "$price"}}}, {"docs", bson.D{{"$push", "$$ROOT"}}}}}}
addFieldsStage := bson.D{{"$addFields", bson.D{{"prev_doc", bson.D{{"$let", bson.D{{"vars", bson.D{{"filtered", bson.D{{"$filter", bson.D{{"input", "$docs"}, {"cond", bson.D{{"$eq", bson.A{bson.D{{"$dateTrunc", bson.D{{"date", "$$this.time_stamp"}, {"unit", "second"}, {"binSize", 60}}}}, bson.D{{"$subtract", bson.A{bson.D{{"$dateTrunc", bson.D{{"date", "$latest_time_stamp"}, {"unit", "second"}, {"binSize", 60}}}}, period }}}}}}}}}}}}}, { "in", bson.D{{"$cond", bson.A{bson.D{{"$eq", bson.A{bson.D{{"$size", "$$filtered"}}, 0}}}, bson.D{{"time_stamp", "none"}, {"price", "none"}}, bson.D{{"$arrayElemAt", bson.A{"$$filtered", 0}}}}}}}}}}}}}}
projectStage := bson.D{{"$project", bson.D{{"trading_pair", "$_id"}, {"_id", 0}, {"latest_time_stamp", 1}, {"latest_price", 1}, {"prev_time_stamp", "$prev_doc.time_stamp"}, {"prev_price", "$prev_doc.price"}, {"percent_change", bson.D{{"$cond", bson.A{bson.D{{"$eq", bson.A{"$prev_doc.price", "none"}}}, "none", bson.D{{"$divide", bson.A{ bson.D{{"$multiply", bson.A{ bson.D{{"$subtract", bson.A{"$latest_price", "$prev_doc.price" }}}, 100 }}}, "$prev_doc.price" }}}}}}}}}}
crsr, err := collection.Aggregate(ctx, mongo.Pipeline{ sortStage, groupStage, addFieldsStage, projectStage })
if err != nil {
fmt.Println("Failed to Aggregate:")
log.Fatal(err)
}
for crsr.Next(context.TODO()) {
var result bson.M
if err := crsr.Decode(&result); err != nil {
fmt.Println("Cursor decode error:")
log.Fatal(err)
}
fmt.Println(result)
}
if err := crsr.Err(); err != nil {
fmt.Println("Cursor error:")
log.Fatal(err)
}