Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
C#
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Languageschevron-right
C#chevron-right

Handling Complex Aggregation Pipelines With C#

Markus Wildgruber5 min read • Published Nov 15, 2024 • Updated Nov 15, 2024
.NETC#
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
As shown in my last article on the basics of running aggregation pipelines with the MongoDB C# driver, there are plenty of ways to set up and run aggregation pipelines from a .NET application. The MongoDB C# driver brings a powerful LINQ driver that simplifies building aggregation pipelines for developers who are experienced with C# and less with MongoDB.
For experienced developers, or when you have complex aggregation pipelines, you will likely use MongoDB Compass or another tool to set up, test, and fine-tune an aggregation pipeline that you want to run from a C# application later on. Using this easy workflow, you can use all aggregation pipeline stages and build the pipeline on a realistic dataset before including it in code. Also, for experienced MongoDB developers, it might be easier to read the aggregation pipeline in JSON form instead of C# statements.
This tutorial focusses on an approach that enables experienced MongoDB developers to use pipelines from MongoDB Compass and include them in C# code. This approach complements the basic methods shown in my last tutorial and can be used for complex scenarios.
The samples that are used throughout this article are based on the movies collection in the samples_mflix database. We will use the following Data Transfer Object (DTO) classes:
1[BsonIgnoreExtraElements]
2[BsonNoId]
3public class Movie
4{
5 [BsonElement("title")]
6 public required string Title { get; set; }
7
8 [BsonElement("year")]
9 public required int Year { get; set; }
10
11 [BsonElement("cast")]
12 public List<string> Cast { get; set; } = new();
13
14 [BsonElement("imdb")]
15 public Imdb Imdb { get; set; } = new();
16}
17
18[BsonIgnoreExtraElements]
19public class Imdb
20{
21 [BsonElement("rating")]
22 public double Rating { get; set; }
23}
24
25public class RatingByYear
26{
27 [BsonId]
28 public int Year { get; set; }
29
30 [BsonElement("rating")]
31 public double AvgRating { get; set; }
32}

Parsing aggregation pipelines from JSON

When setting up an aggregation pipeline in MongoDB Compass, you can retrieve the JSON for your pipeline by switching to the Text view: Use the "Text" button to switch to JSON view of the aggregation pipeline for easier copying
This shows the complete pipeline so that you can easily copy the JSON to the clipboard. As a first step, you can include the JSON in a C# multiline string. In our case, we use the aggregation from the last article for demonstration purposes:
1var pipelineStr = """
2 [
3 {
4 $match: {
5 cast: "Robert De Niro"
6 }
7 },
8 {
9 $group: {
10 _id: "$year",
11 rating: { $avg: "$imdb.rating" }
12 }
13 },
14 {
15 $sort: {
16 rating: -1
17 }
18 }
19 ]
20 """;
21
22PipelineDefinition<Movie, RatingByYear> pipeline = BsonSerializer.Deserialize<BsonArray>(pipelineStr)
23 .Select(x => x.AsBsonDocument)
24 .ToArray();
25
26var result = await (await movies.AggregateAsync<RatingByYear>(pipeline)).ToListAsync();
The above code deserializes the pipeline from the multiline string and converts it to an array of BsonDocument objects. There is an implicit conversion operator that allows converting the array to a PipelineDefinition<Movie, RatingByYear> object. This pipeline definition can then be supplied as a variable to the AggregateAsync method that runs the aggregation and returns the result.
Instead of including the pipeline as a string in your C# code, you can also read it from a file that you deploy with your application, or embed a JSON file as a resource in your application:
Set build action to "Embedded resource" to include JSON file as a resource
This provides a better separation between the C# code and the pipelines. You can access the resource like this:
1// Be sure to use the correct resource name, the schema is as follows:
2// <PROJECT ROOT NAMESPACE>.<Folders>.<FileName>
3// If the resource name is invalid, null is returned
4using var stream = typeof(Program).Assembly.GetManifestResourceStream("MyNamespace.Pipelines.RatingByYear.json");
5using var reader = new StreamReader(stream!);
6var pipelineStr = await reader.ReadToEndAsync();
7
8PipelineDefinition<Movie, RatingByYear> pipeline = BsonSerializer.Deserialize<BsonArray>(pipelineStr)
9 .Select(x => x.AsBsonDocument)
10 .ToArray();
11
12var result = await (await movies.AggregateAsync(pipeline)).ToListAsync();

Using variables with aggregation pipelines

Having a static pipeline is a good start, but what if you want to make adjustments when running the aggregation pipeline? In our example, we might be interested in querying other members of the cast and only return a configurable number of years with the top ratings.
1[
2 {
3 "$match": {
4 "$expr": {
5 "$in": [
6 "$$castMember",
7 {
8 "$ifNull": [
9 "$cast",
10 []
11 ]
12 }
13 ]
14 }
15 }
16 },
17 {
18 "$group": {
19 "_id": "$year",
20 "rating": { "$avg": "$imdb.rating" }
21 }
22 },
23 {
24 "$setWindowFields": {
25 "sortBy": { "rating": -1 },
26 "output": {
27 "docNo": {
28 "$documentNumber": {}
29 }
30 }
31 }
32 },
33 {
34 "$match": {
35 "$expr": {
36 "$lte": [ "$docNo", "$$topN" ]
37 }
38 }
39 },
40 {
41 "$unset": "docNo"
42 }
43]
The above pipeline has been adjusted to use the variables $$castMember and $$topN. We will have a closer look on these adjustments in the following section. For now, we focus on running the pipeline in C#. To provide values for the variables, an AggregationOptions object is used, as shown below:
1using var stream = typeof(Program).Assembly.GetManifestResourceStream("MyNamespace.Pipelines.RatingByYearParameterized.json");
2using var reader = new StreamReader(stream!);
3var pipelineStr = await reader.ReadToEndAsync();
4
5PipelineDefinition<Movie, RatingByYear> pipeline = BsonSerializer.Deserialize<BsonArray>(pipelineStr)
6 .Select(x => x.AsBsonDocument)
7 .ToArray();
8
9var options = new AggregateOptions()
10{
11 Let = new BsonDocument()
12 {
13 { "castMember", "Al Pacino" },
14 { "topN", 10 },
15 }
16};
17
18var result = await (await movies.AggregateAsync(pipeline, options)).ToListAsync();

Adjusting the pipeline for the use of variables

As you have seen, the pipeline has been adjusted to use the variables. This is necessary because not all stages use the variable values as expected. For instance, the first $match stage does not evaluate a variable when using the standard $match syntax:
1{
2 $match: {
3 cast: "$$castMember"
4 }
5},
As the condition as such is valid for MongoDB, this does not raise an error but leads to an empty result set as there is no movie with a cast member that goes by the unusual name $$castMember. Using $expr in the condition asserts that the variables are replaced with their values when performing the comparison:
1{
2 "$match": {
3 "$expr": {
4 "$in": [
5 "$$castMember",
6 {
7 "$ifNull": [
8 "$cast",
9 []
10 ]
11 }
12 ]
13 }
14 }
15},
In addition, the expression syntax handles null values differently, so we need to add an $ifNull expression that asserts that missing or null values are replaced with an empty array.
In most cases, however, MongoDB raises an error when the variable cannot be used. In our sample, the pipeline cannot use the $limit stage because this stage expects a static value for the maximum number of documents that should be returned. Hence we have to replace the simple $limit stage with a combination of:
  • $setWindowFields that sorts the documents and adds a property docNo for the number of the document.
  • $match that keeps only the number of documents specified in the variable (also with an $expr condition).
  • $unset to remove the docNo property as we do not need it in our result set.
1{
2 "$setWindowFields": {
3 "sortBy": { "rating": -1 },
4 "output": {
5 "docNo": {
6 "$documentNumber": {}
7 }
8 }
9 }
10},
11{
12 "$match": {
13 "$expr": {
14 "$lte": [ "$docNo", "$$topN" ]
15 }
16 }
17},
18{
19 "$unset": "docNo"
20}
When testing your pipeline, be aware that you also need to look for unexpected and especially empty result sets when introducing variables in a pipeline.

Summary

This tutorial outlined several ways to handle complex aggregation pipelines in C# while still keeping the original MongoDB pipeline definition available. You can use the latest and greatest MongoDB aggregation pipeline stages in their original form. Also, for developers who are experienced with MongoDB, the JSON format of the pipeline might be easier to read. When applying changes to the pipeline later on, you can test and analyze the exact pipeline in MongoDB Compass or the MongoDB shell.
What is your favorite method to tackle challenging aggregation pipelines with the MongoDB C# driver? Let us know in the MongoDB Developer Community Forums!
Top Comments in Forums
There are no comments on this article yet.
Start the Conversation

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Integrating MongoDB with TensorFlow and C#


Sep 05, 2024 | 8 min read
Tutorial

Getting Started with Microsoft's Semantic Kernel in C# and MongoDB Atlas


Oct 10, 2024 | 10 min read
Tutorial

Using Polymorphism with MongoDB and C#


Apr 30, 2024 | 6 min read
Tutorial

Joining Collections in MongoDB with .NET Core and an Aggregation Pipeline


Feb 03, 2023 | 7 min read
Table of Contents