Aggregation group by

Hi
I have 2 collections and i want to join them and filter max value

my data is like below
// schemas
row1 = {“topic_name”: “rating01”, “version”: “1”, “reg_dt”: “20220401 22:10:05”}
row2 = {“topic_name”: “rating01”, “version”: “2”, “reg_dt”: “20220402 10:01:10”}
row3 = {“topic_name”: “rating02”, “version”: “1”, “reg_dt”: “20220403 11:50:02”}
row4 = {“topic_name”: “rating03”, “version”: “1”, “reg_dt”: “20220303 12:00:00”}
row5 = {“topic_name”: “rating03”, “version”: “2”, “reg_dt”: “20220320 01:16:12”}
row6 = {“topic_name”: “rating03”, “version”: “3”, “reg_dt”: “20220321 01:20:15”}

//meta
ro1 = {“topic_name”: “rating01”, “schema_version”: “1”, “reg_dt”: “20220401 23:30:04”}
ro2 = {“topic_name”: “rating03”, “schema_version”: “1”, “reg_dt”: “20220310 16:20:00”}
ro3 = {“topic_name”: “rating03”, “schema_version”: “2”, “reg_dt”: “20220321 02:50:55”}

db.schemas.insertOne(row1)
db.schemas.insertOne(row2)
db.schemas.insertOne(row3)
db.schemas.insertOne(row4)
db.schemas.insertOne(row5)
db.schemas.insertOne(row6)

db.meta.insertOne(ro1)
db.meta.insertOne(ro2)
db.meta.insertOne(ro3)

i used agg pipeline like this,
var options = {
allowDiskUse: true
};

var pipeline = [
{
“$project”: {
“_id”: 0,
“meta”: “$$ROOT”
}
},
{
“$lookup”: {
“localField”: “meta.non_existing_field”,
“from”: “schemas”,
“foreignField”: “non_existing_field”,
“as”: “schemas”
}
},
{
“$unwind”: {
“path”: “$schemas”,
“preserveNullAndEmptyArrays”: false
}
},
{
“$match”: {
“$and”: [
{
“$expr”: {
“$eq”: [
“$meta.topic_name”,
“$schemas.topic_name”
]
}
},
{
“$expr”: {
“$lt”: [
“$meta.schema_version”,
“$schemas.version”
]
}
},
{
“$expr”: {
“$lt”: [
“$meta.reg_dt”,
“$schemas.reg_dt”
]
}
}
]
}
},
{
“$project”: {
“meta.topic_name”: “$meta.topic_name”,
“meta.schema_version”: “$meta.schema_version”,
“meta.reg_dt”: “$meta.reg_dt”,
“schemas.topic_name”: “$schemas.topic_name”,
“schemas.version”: “$schemas.version”,
“schemas.reg_dt”: “$schemas.reg_dt”,
“_id”: 0
}
}
];

db.meta.aggregate(pipeline)

the output is like this,

{ meta:    { topic_name: 'rating01',     schema_version: '1',     reg_dt: '20220401 23:30:04' },  
schemas:    { topic_name: 'rating01',     version: '2',     reg_dt: '20220402 10:01:10' } }
{ meta:    { topic_name: 'rating03',     schema_version: '1',     reg_dt: '20220310 16:20:00' },  
schemas:    { topic_name: 'rating03',     version: '2',     reg_dt: '20220320 01:16:12' } }
{ meta:    { topic_name: 'rating03',     schema_version: '1',     reg_dt: '20220310 16:20:00' },  
schemas:    { topic_name: 'rating03',     version: '3',     reg_dt: '20220321 01:20:15' } }

I want to filter only max of schemas.version if topic name is same
my expecting result is like this,

{ meta: { topic_name: ‘rating01’, schema_version: ‘1’, reg_dt: ‘20220401 23:30:04’ },
schemas: { topic_name: ‘rating01’, version: ‘2’, reg_dt: ‘20220402 10:01:10’ } }

{ meta:    { topic_name: 'rating03',     schema_version: '1',     reg_dt: '20220310 16:20:00' },  
schemas:    { topic_name: 'rating03',     version: '3',     reg_dt: '20220321 01:20:15' } }

Anyone please help me ?
thanks

Hello. You can perform the following aggregation to get the desired result. You can “join” data in two collections based on a common field - in this case the topic_name.

db.meta.aggregate([
{ 
    $sort: { topic_name: 1 } 
},
//
// Group by topic name to get only one (the first) topic name
{
    $group: { 
        _id: "$topic_name", 
        meta: { $first: "$$ROOT" } 
    }
},
//
// Lookup the schemas for matching topic name
{ 
    $lookup: {
        localField: "meta.topic_name",
        foreignField: "topic_name",
        from: "schemas",
        as: "schemas"
    }
},
//
// Determine the max version of the schemas for each topic name
{ 
    $project: {
        _id: 0,
        meta: 1,
        schemas: {
            $reduce: {
                input: "$schemas",
                initialValue: { $arrayElemAt: [ "$schemas", 0 ] },
                in: {
                    $cond: [ { $gt: [ "$$this.version", "$$value.version" ] },
                        "$$this", 
                        "$$value"
                    ]
                }
            }
        }
    }
},
])
1 Like

Thank very much !

// schemas
row1 = {“topic_name”: “rating01”, “version”: “1”, “reg_dt”: “20220401 22:10:05”}
row2 = {“topic_name”: “rating01”, “version”: “2”, “reg_dt”: “20220402 10:01:10”}
row3 = {“topic_name”: “rating02”, “version”: “1”, “reg_dt”: “20220403 11:50:02”}
row4 = {“topic_name”: “rating03”, “version”: “1”, “reg_dt”: “20220303 12:00:00”}
row5 = {“topic_name”: “rating03”, “version”: “2”, “reg_dt”: “20220320 01:16:12”}
row6 = {“topic_name”: “rating03”, “version”: “3”, “reg_dt”: “20220321 01:20:15”}

//meta
ro1 = {“topic_name”: “rating01”, “schema_version”: “1”, “reg_dt”: “20220401 23:30:04”}
ro2 = {“topic_name”: “rating03”, “schema_version”: “1”, “reg_dt”: “20220310 16:20:00”}
ro3 = {“topic_name”: “rating03”, “schema_version”: “2”, “reg_dt”: “20220321 02:50:55”}

db.schemas.insertOne(row1)
db.schemas.insertOne(row2)
db.schemas.insertOne(row3)
db.schemas.insertOne(row4)
db.schemas.insertOne(row5)
db.schemas.insertOne(row6)

db.meta.insertOne(ro1)
db.meta.insertOne(ro2)
db.meta.insertOne(ro3)

var pipeline = [
{
$sort: { topic_name: 1 }
},
// Group by topic name to get only one (the first) topic name
{
$group: {
_id: “$topic_name”,
meta: { $first: “$$ROOT” }
}
},
// Lookup the schemas for matching topic name
{
$lookup: {
localField: “meta.topic_name”,
foreignField: “topic_name”,
from: “schemas”,
as: “schemas”
}
},
// Determine the max version of the schemas for each topic name
{
$project: {
_id: 0,
meta: 1,
schemas: {
$reduce: {
input: “$schemas”,
initialValue: {},
in: {
$cond: [ { $gt: [ “$$this.version”, “$$value.version” ] },
“$$this”,
“$$value”
]
}
}
}
}
},
];

db.meta.aggregate(pipeline)

{ meta:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e79”),
topic_name: ‘rating03’,
schema_version: ‘1’,
reg_dt: ‘20220310 16:20:00’ },
schemas:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e77”),
topic_name: ‘rating03’,
version: ‘3’,
reg_dt: ‘20220321 01:20:15’ } }
{ meta:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e78”),
topic_name: ‘rating01’,
schema_version: ‘1’,
reg_dt: ‘20220401 23:30:04’ },
schemas:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e73”),
topic_name: ‘rating01’,
version: ‘2’,
reg_dt: ‘20220402 10:01:10’ } }

If i only want to see the latest schema_version of topic in the meta collection, just as only see the latest version topic in the schema collection.
How to implement this case?

my expecting result is like this,
{ meta:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e79”),
topic_name: ‘rating03’,
schema_version: ‘2’,
reg_dt: ‘20220310 16:20:00’ },
schemas:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e77”),
topic_name: ‘rating03’,
version: ‘3’,
reg_dt: ‘20220321 01:20:15’ } }
{ meta:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e78”),
topic_name: ‘rating01’,
schema_version: ‘1’,
reg_dt: ‘20220401 23:30:04’ },
schemas:
{ _id: ObjectId(“625d09f00fc3c6d8941b9e73”),
topic_name: ‘rating01’,
version: ‘2’,
reg_dt: ‘20220402 10:01:10’ } }

Could you help me please
Thank you

Replace the existing $sort stage with this, in the pipeline:

{ 
    $sort: { topic_name: 1, schema_version: -1 } 
},
1 Like

Thank very much ! Saya

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.