HomeLearnQuickstartGetting Started with Aggregation Pipelines in Rust

Getting Started with Aggregation Pipelines in Rust

Published: Mar 29, 2021

  • Atlas
  • MongoDB
  • Rust
  • ...

By Mark Smith

Rate this article
Rust badge

MongoDB's aggregation pipelines are one of its most powerful features. They allow you to write expressions, broken down into a series of stages, which perform operations including aggregation, transformations, and joins on the data in your MongoDB databases. This allows you to do calculations and analytics across documents and collections within your MongoDB database.

#Prerequisites

This quick start is the second in a series of Rust posts. I highly recommend you start with my first post, Basic MongoDB Operations in Rust, which will show you how to get set up correctly with a free MongoDB Atlas database cluster containing the sample data you'll be working with here. Go read it and come back. I'll wait. Without it, you won't have the database set up correctly to run the code in this quick start guide.

In summary, you'll need:

  • An up-to-date version of Rust. I used 1.49, but any recent version should work well.
  • A code editor of your choice. I recommend VS Code with the Rust Analyzer extension.
  • A MongoDB cluster containing the sample_mflix dataset. You can find instructions to set that up in the first blog post in this series.

#Getting Started

MongoDB's aggregation pipelines are very powerful and so they can seem a little overwhelming at first. For this reason, I'll start off slowly. First, I'll show you how to build up a pipeline that duplicates behaviour that you can already achieve with MongoDB's find() method, but instead using an aggregation pipeline with $match, $sort, and $limit stages. Then, I'll show how to make queries that go beyond what can be done with find, demonstrating using $lookup to include related documents from another collection. Finally, I'll put the "aggregation" into "aggregation pipeline" by showing you how to use $group to group together documents to form new document summaries.

All of the sample code for this quick start series can be found on GitHub. I recommend you check it out if you get stuck, but otherwise, it's worth following the tutorial and writing the code yourself!

All of the pipelines in this post will be executed against the sample_mflix database's movies collection. It contains documents that look like this (I'm showing you what they look like in Python, because it's a little more readable than the equivalent Rust struct):

1{
2 '_id': ObjectId('573a1392f29313caabcdb497'),
3 'awards': {'nominations': 7,
4 'text': 'Won 1 Oscar. Another 2 wins & 7 nominations.',
5 'wins': 3},
6 'cast': ['Janet Gaynor', 'Fredric March', 'Adolphe Menjou', 'May Robson'],
7 'countries': ['USA'],
8 'directors': ['William A. Wellman', 'Jack Conway'],
9 'fullplot': 'Esther Blodgett is just another starry-eyed farm kid trying to '
10 'break into the movies. Waitressing at a Hollywood party, she '
11 'catches the eye of alcoholic star Norman Maine, is given a test, '
12 'and is caught up in the Hollywood glamor machine (ruthlessly '
13 'satirized). She and her idol Norman marry; but his career '
14 'abruptly dwindles to nothing',
15 'genres': ['Drama'],
16 'imdb': {'id': 29606, 'rating': 7.7, 'votes': 5005},
17 'languages': ['English'],
18 'lastupdated': '2015-09-01 00:55:54.333000000',
19 'plot': 'A young woman comes to Hollywood with dreams of stardom, but '
20 'achieves them only with the help of an alcoholic leading man whose '
21 'best days are behind him.',
22 'poster': 'https://m.media-amazon.com/images/M/MV5BMmE5ODI0NzMtYjc5Yy00MzMzLTk5OTQtN2Q3MzgwOTllMTY3XkEyXkFqcGdeQXVyNjc0MzMzNjA@._V1_SY1000_SX677_AL_.jpg',
23 'rated': 'NOT RATED',
24 'released': datetime.datetime(1937, 4, 27, 0, 0),
25 'runtime': 111,
26 'title': 'A Star Is Born',
27 'tomatoes': {'critic': {'meter': 100, 'numReviews': 11, 'rating': 7.4},
28 'dvd': datetime.datetime(2004, 11, 16, 0, 0),
29 'fresh': 11,
30 'lastUpdated': datetime.datetime(2015, 8, 26, 18, 58, 34),
31 'production': 'Image Entertainment Inc.',
32 'rotten': 0,
33 'viewer': {'meter': 79, 'numReviews': 2526, 'rating': 3.6},
34 'website': 'http://www.vcientertainment.com/Film-Categories?product_id=73'},
35 'type': 'movie',
36 'writers': ['Dorothy Parker (screen play)',
37 'Alan Campbell (screen play)',
38 'Robert Carson (screen play)',
39 'William A. Wellman (from a story by)',
40 'Robert Carson (from a story by)'],
41 'year': 1937}

There's a lot of data there, but I'll be focusing mainly on the _id, title, year, and cast fields.

#Your First Aggregation Pipeline

Aggregation pipelines are executed by the mongodb module using a Collection's aggregate() method.

The first argument to aggregate() is a sequence of pipeline stages to be executed. Much like a query, each stage of an aggregation pipeline is a BSON document. You'll often create these using the doc! macro that was introduced in the previous post.

An aggregation pipeline operates on all of the data in a collection. Each stage in the pipeline is applied to the documents passing through, and whatever documents are emitted from one stage are passed as input to the next stage, until there are no more stages left. At this point, the documents emitted from the last stage in the pipeline are returned to the client program, as a cursor, in a similar way to a call to find().

Individual stages, such as $match, can act as a filter, to only pass through documents matching certain criteria. Other stage types, such as $project, $addFields, and $lookup, will modify the content of individual documents as they pass through the pipeline. Finally, certain stage types, such as $group, will create an entirely new set of documents based on the documents passed into it taken as a whole. None of these stages change the data that is stored in MongoDB itself. They just change the data before returning it to your program! There are stages, like $out, which can save the results of a pipeline back into MongoDB, but I won't be covering it in this quick start.

I'm going to assume that you're working in the same environment that you used for the last post, so you should already have the mongodb crate configured as a dependency in your Cargo.toml file, and you should have a .env file containing your MONGODB_URI environment variable.

#Finding and Sorting

First, paste the following into your Rust code:

1// Load the MongoDB connection string from an environment variable:
2let client_uri =
3 env::var("MONGODB_URI").expect("You must set the MONGODB_URI environment var!");
4
5// An extra line of code to work around a DNS issue on Windows:
6let options =
7 ClientOptions::parse_with_resolver_config(&client_uri, ResolverConfig::cloudflare())
8 .await?;
9let client = mongodb::Client::with_options(options)?;
10
11// Get the 'movies' collection from the 'sample_mflix' database:
12let movies = client.database("sample_mflix").collection("movies");

The above code will provide a Collection instance called movie_collection, which points to the movies collection in your database.

Here is some code which creates a pipeline, executes it with aggregate, and then loops through and prints the detail of each movie in the results. Paste it into your program.

1// Usually implemented outside your main function:
2#[derive(Deserialize)]
3struct MovieSummary {
4 title: String,
5 cast: Vec<String>,
6 year: i32,
7}
8
9impl fmt::Display for MovieSummary {
10 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
11 write!(
12 f,
13 "{}, {}, {}",
14 self.title,
15 self.cast.get(0).unwrap_or(&"- no cast -".to_owned()),
16 self.year
17 )
18 }
19}
20
21// Inside main():
22let pipeline = vec![
23 doc! {
24 // filter on movie title:
25 "$match": {
26 "title": "A Star Is Born"
27 }
28 },
29 doc! {
30 // sort by year, ascending:
31 "$sort": {
32 "year": 1
33 }
34 },
35];
36
37// Look up "A Star is Born" in ascending year order:
38let mut results = movies.aggregate(pipeline, None).await?;
39// Loop through the results, convert each of them to a MovieSummary, and then print out.
40while let Some(result) = results.next().await {
41 // Use serde to deserialize into the MovieSummary struct:
42 let doc: MovieSummary = bson::from_document(result?)?;
43 println!("* {}", doc);
44}

This pipeline has two stages. The first is a $match stage, which is similar to querying a collection with find(). It filters the documents passing through the stage based on a read operation query. Because it's the first stage in the pipeline, its input is all of the documents in the movie collection. The query for the $match stage filters on the title field of the input documents, so the only documents that will be output from this stage will have a title of "A Star Is Born."

The second stage is a $sort stage. Only the documents for the movie "A Star Is Born" are passed to this stage, so the result will be all of the movies called "A Star Is Born," now sorted by their year field, with the oldest movie first.

Calls to aggregate() return a cursor pointing to the resulting documents. Cursor implements the Stream trait. The cursor can be looped through like any other stream, as long as you've imported StreamExt, which provides the next() method. The code above loops through all of the returned documents and prints a short summary, consisting of the title, the first actor in the cast array, and the year the movie was produced.

Executing the code above results in:

1* A Star Is Born, Janet Gaynor, 1937
2* A Star Is Born, Judy Garland, 1954
3* A Star Is Born, Barbra Streisand, 1976

#Refactoring the Code

It is possible to build up whole aggregation pipelines as a single data structure, as in the example above, but it's not necessarily a good idea. Pipelines can get long and complex. For this reason, I recommend you build up each stage of your pipeline as a separate variable, and then combine the stages into a pipeline at the end, like this:

1// Match title = "A Star Is Born":
2let stage_match_title = doc! {
3 "$match": {
4 "title": "A Star Is Born"
5 }
6};
7
8// Sort by year, ascending:
9let stage_sort_year_ascending = doc! {
10 "$sort": { "year": 1 }
11};
12
13// Now the pipeline is easier to read:
14let pipeline = vec![stage_match_title, stage_sort_year_ascending];

#Limit the Number of Results

Imagine I wanted to obtain the most recent production of "A Star Is Born" from the movies collection.

This can be thought of as three stages, executed in order:

  1. Obtain the movie documents for "A Star Is Born."
  2. Sort by year, descending.
  3. Discard all but the first document.

The first stage is already the same as stage_match_title above. The second stage is the same as stage_sort_year_ascending, but with the value 1 changed to -1. The third stage is a $limit stage.

The modified and new code looks like this:

1// Sort by year, descending:
2let stage_sort_year_descending = doc! {
3 "$sort": {
4 "year": -1
5 }
6};
7
8// Limit to 1 document:
9let stage_limit_1 = doc! { "$limit": 1 };
10
11let pipeline = vec![stage_match_title, stage_sort_year_descending, stage_limit_1];

If you make the changes above and execute your code, then you should see just the following line:

1* A Star Is Born, Barbra Streisand, 1976

Wait a minute! Why isn't there a document for the amazing production with Lady Gaga and Bradley Cooper?

Why don't you use the skills you've just learned to find the latest date in the collection? That will give you your answer!

Okay, so now you know how to filter, sort, and limit the contents of a collection using an aggregation pipeline. But these are just operations you can already do with find()! Why would you want to use these complex, new-fangled aggregation pipelines?

Read on, my friend, and I will show you the true power of MongoDB aggregation pipelines.

There's a dirty secret, hiding in the sample_mflix database. As well as the movies collection, there's also a collection called comments. Documents in the comments collection look like this:

1{
2 '_id': ObjectId('5a9427648b0beebeb69579d3'),
3 'movie_id': ObjectId('573a1390f29313caabcd4217'),
4 'date': datetime.datetime(1983, 4, 27, 20, 39, 15),
5 'email': 'cameron_duran@fakegmail.com',
6 'name': 'Cameron Duran',
7 'text': 'Quasi dicta culpa asperiores quaerat perferendis neque. Est animi '
8 'pariatur impedit itaque exercitationem.'}

It's a comment for a movie. I'm not sure why people are writing Latin comments for these movies, but let's go with it. The second field, movie_id, corresponds to the _id value of a document in the movies collection.

So, it's a comment related to a movie!

Does MongoDB enable you to query movies and embed the related comments, like a JOIN in a relational database? Yes it does— with the $lookup stage.

I'll show you how to obtain related documents from another collection, and embed them in the documents from your primary collection.

First, modify the definition of the MovieSummary struct so that it has a comments field, loaded from a related_comments BSON field. Define a Comment struct that contains a subset of the data contained in a comments document.

1#[derive(Deserialize)]
2struct MovieSummary {
3 title: String,
4 cast: Vec<String>,
5 year: i32,
6 #[serde(default, rename = "related_comments")]
7 comments: Vec<Comment>,
8}
9
10#[derive(Debug, Deserialize)]
11struct Comment {
12 email: String,
13 name: String,
14 text: String,
15}

Next, create a new pipeline from scratch, and start with the following:

1// Look up related documents in the 'comments' collection:
2let stage_lookup_comments = doc! {
3 "$lookup": {
4 "from": "comments",
5 "localField": "_id",
6 "foreignField": "movie_id",
7 "as": "related_comments",
8 }
9};
10
11// Limit to the first 5 documents:
12let stage_limit_5 = doc! { "$limit": 5 };
13
14let pipeline = vec![
15 stage_lookup_comments,
16 stage_limit_5,
17];
18
19let mut results = movies.aggregate(pipeline, None).await?;
20// Loop through the results and print a summary and the comments:
21while let Some(result) = results.next().await {
22 let doc: MovieSummary = bson::from_document(result?)?;
23 println!("* {}, comments={:?}", doc, doc.comments);
24}

The stage I've called stage_lookup_comments is a $lookup stage. This $lookup stage will look up documents from the comments collection that have the same movie id. The matching comments will be listed as an array in a BSON field named related_comments, with an array value containing all of the comments that have this movie's _id value as movie_id.

I've added a $limit stage just to ensure that there's a reasonable amount of output without being overwhelming.

Now, execute the code.

You may notice that the pipeline above runs pretty slowly! There are two reasons for this:

  • There are 23.5k movie documents and 50k comments.
  • There's a missing index on the comments collection. It's missing on purpose, to teach you about indexes!

I'm not going to show you how to fix the index problem right now. I'll write about that in a later post in this series, focusing on indexes. Instead, I'll show you a trick for working with slow aggregation pipelines while you're developing.

Working with slow pipelines is a pain while you're writing and testing the pipeline. But, if you put a temporary $limit stage at the start of your pipeline, it will make the query faster (although the results may be different because you're not running on the whole dataset).

When I was writing this pipeline, I had a first stage of { "$limit": 1000 }.

When you have finished crafting the pipeline, you can comment out the first stage so that the pipeline will now run on the whole collection. Don't forget to remove the first stage, or you're going to get the wrong results!

The aggregation pipeline above will print out summaries of five movie documents. I expect that most or all of your movie summaries will end with this: comments=[].

#Matching on Array Length

If you're lucky, you may have some documents in the array, but it's unlikely, as most of the movies have no comments. Now, I'll show you how to add some stages to match only movies which have more than two comments.

Ideally, you'd be able to add a single $match stage which obtained the length of the related_comments field and matched it against the expression { "$gt": 2 }. In this case, it's actually two steps:

  • Add a field (I'll call it comment_count) containing the length of the related_comments field.
  • Match where the value of comment_count is greater than two.

Here is the code for the two stages:

1// Calculate the number of comments for each movie:
2let stage_add_comment_count = doc! {
3 "$addFields": {
4 "comment_count": {
5 "$size": "$related_comments"
6 }
7 }
8};
9
10// Match movie documents with more than 2 comments:
11let stage_match_with_comments = doc! {
12 "$match": {
13 "comment_count": {
14 "$gt": 2
15 }
16 }
17};

The two stages go after the $lookup stage, and before the $limit 5 stage:

1let pipeline = vec![
2 stage_lookup_comments,
3 stage_add_comment_count,
4 stage_match_with_comments,
5 limit_5,
6]

While I'm here, I'm going to clean up the output of this code to format the comments slightly better:

1let mut results = movies.aggregate(pipeline, None).await?;
2// Loop through the results and print a summary and the comments:
3while let Some(result) = results.next().await {
4 let doc: MovieSummary = bson::from_document(result?)?;
5 println!("* {}", doc);
6 if doc.comments.len() > 0 {
7 // Print a max of 5 comments per movie:
8 for comment in doc.comments.iter().take(5) {
9 println!(
10 " - {} <{}>: {}",
11 comment.name,
12 comment.email,
13 comment.text.chars().take(60).collect::<String>(),
14 );
15 }
16 } else {
17 println!(" - No comments");
18 }
19}

Now when you run this code, you should see something more like this:

1* Midnight, Claudette Colbert, 1939
2 - Sansa Stark <sansa_stark@fakegmail.com>: Error ex culpa dignissimos assumenda voluptates vel. Qui inventore
3 - Theon Greyjoy <theon_greyjoy@fakegmail.com>: Animi dolor minima culpa sequi voluptate. Possimus necessitatibu
4 - Donna Smith <donna_smith@fakegmail.com>: Et esse nulla ducimus tempore aliquid. Suscipit iste dignissimos v

It's good to see Sansa Stark from Game of Thrones really knows her Latin, isn't it?

Now I've shown you how to work with lookups in your pipelines, I'll show you how to use the $group stage to do actual aggregation.

#Grouping Documents with $group

I'll start with a new pipeline again.

The $group stage is one of the more difficult stages to understand, so I'll break this down slowly.

Start with the following code:

1// Define a struct to hold grouped data by year:
2#[derive(Debug, Deserialize)]
3struct YearSummary {
4 _id: i32,
5 #[serde(default)]
6 movie_count: i64,
7 #[serde(default)]
8 movie_titles: Vec<String>,
9}
10
11// Some movies have "year" values ending with 'è'.
12// This stage will filter them out:
13let stage_filter_valid_years = doc! {
14 "$match": {
15 "year": {
16 "$type": "number",
17 }
18 }
19};
20
21/* * Group movies by year, producing 'year-summary' documents that look like: * { * '_id': 1917, * } */
22let stage_group_year = doc! {
23 "$group": {
24 "_id": "$year",
25 }
26};
27
28let pipeline = vec![stage_filter_valid_years, stage_group_year];
29
30// Loop through the 'year-summary' documents:
31let mut results = movies.aggregate(pipeline, None).await?;
32// Loop through the yearly summaries and print their debug representation:
33while let Some(result) = results.next().await {
34 let doc: YearSummary = bson::from_document(result?)?;
35 println!("* {:?}", doc);
36}

In the movies collection, some of the years contain the "è" character. This database has some messy values in it. In this case, there's only a small handful of documents, and I think we should just remove them, so I've added a $match stage that filters out any documents with a year that's not numeric.

Execute this code, and you should see something like this:

1* YearSummary { _id: 1959, movie_count: 0, movie_titles: [] }
2* YearSummary { _id: 1980, movie_count: 0, movie_titles: [] }
3* YearSummary { _id: 1977, movie_count: 0, movie_titles: [] }
4* YearSummary { _id: 1933, movie_count: 0, movie_titles: [] }
5* YearSummary { _id: 1998, movie_count: 0, movie_titles: [] }
6* YearSummary { _id: 1922, movie_count: 0, movie_titles: [] }
7* YearSummary { _id: 1948, movie_count: 0, movie_titles: [] }
8* YearSummary { _id: 1965, movie_count: 0, movie_titles: [] }
9* YearSummary { _id: 1950, movie_count: 0, movie_titles: [] }
10* YearSummary { _id: 1968, movie_count: 0, movie_titles: [] }
11...

Each line is a document emitted from the aggregation pipeline. But you're not looking at movie documents anymore. The $group stage groups input documents by the specified _id expression and outputs one document for each unique _id value. In this case, the expression is $year, which means one document will be emitted for each unique value of the year field. Each document emitted can (and usually will) also contain values generated from aggregating data from the grouped documents. Currently, the YearSummary documents are using the default values for movie_count and movie_titles. Let's fix that.

Change the stage definition to the following:

1let stage_group_year = doc! {
2 "$group": {
3 "_id": "$year",
4 // Count the number of movies in the group:
5 "movie_count": { "$sum": 1 },
6 }
7};

This will add a movie_count field, containing the result of adding 1 for every document in the group. In other words, it counts the number of movie documents in the group. If you execute the code now, you should see something like the following:

1* YearSummary { _id: 2005, movie_count: 758, movie_titles: [] }
2* YearSummary { _id: 1999, movie_count: 542, movie_titles: [] }
3* YearSummary { _id: 1943, movie_count: 36, movie_titles: [] }
4* YearSummary { _id: 1926, movie_count: 9, movie_titles: [] }
5* YearSummary { _id: 1935, movie_count: 40, movie_titles: [] }
6* YearSummary { _id: 1966, movie_count: 116, movie_titles: [] }
7* YearSummary { _id: 1971, movie_count: 116, movie_titles: [] }
8* YearSummary { _id: 1952, movie_count: 58, movie_titles: [] }
9* YearSummary { _id: 2013, movie_count: 1221, movie_titles: [] }
10* YearSummary { _id: 1912, movie_count: 2, movie_titles: [] }
11...

There are a number of accumulator operators, like $sum, that allow you to summarize data from the group. If you wanted to build an array of all the movie titles in the emitted document, you could add "movie_titles": { "$push": "$title" }, to the $group stage. In that case, you would get YearSummary instances that look like this:

1* YearSummary { _id: 1986, movie_count: 206, movie_titles: ["Defense of the Realm", "F/X", "Mala Noche", "Witch from Nepal", ... ]}

Add the following stage to sort the results:

1let stage_sort_year_ascending = doc! {
2 "$sort": {"_id": 1}
3};
4
5let pipeline = vec! [
6 stage_filter_valid_years, // Match numeric years
7 stage_group_year,
8 stage_sort_year_ascending, // Sort by year (which is the unique _id field)
9]

Note that the $match stage is added to the start of the pipeline, and the $sort is added to the end. A general rule is that you should filter documents out early in your pipeline, so that later stages have fewer documents to deal with. It also ensures that the pipeline is more likely to be able to take advantages of any appropriate indexes assigned to the collection.

Remember, all of the sample code for this quick start series can be found on GitHub.

Aggregations using $group are a great way to discover interesting things about your data. In this example, I'm illustrating the number of movies made each year, but it would also be interesting to see information about movies for each country, or even look at the movies made by different actors.

#What Have You Learned?

You've learned how to construct aggregation pipelines to filter, group, and join documents with other collections. You've hopefully learned that putting a $limit stage at the start of your pipeline can be useful to speed up development (but should be removed before going to production). You've also learned some basic optimization tips, like putting filtering expressions towards the start of your pipeline instead of towards the end.

As you've gone through, you'll probably have noticed that there's a ton of different stage types, operators, and accumulator operators. Learning how to use the different components of aggregation pipelines is a big part of learning to use MongoDB effectively as a developer.

I love working with aggregation pipelines, and I'm always surprised at what you can do with them!

#Next Steps

Aggregation pipelines are super powerful, and because of this, they're a big topic to cover. Check out the full documentation to get a better idea of their full scope.

MongoDB University also offers a free online course on The MongoDB Aggregation Framework.

Note that aggregation pipelines can also be used to generate new data and write it back into a collection, with the $out stage.

MongoDB provides a free GUI tool called Compass. It allows you to connect to your MongoDB cluster, so you can browse through databases and analyze the structure and contents of your collections. It includes an aggregation pipeline builder which makes it easier to build aggregation pipelines. I highly recommend you install it, or if you're using MongoDB Atlas, use its similar aggregation pipeline builder in your browser. I often use them to build aggregation pipelines, and they include export buttons which will export your pipeline as Python code (which isn't too hard to transform into Rust).

I don't know about you, but when I was looking at some of the results above, I thought to myself, "It would be fun to visualise this with a chart." MongoDB provides a hosted service called Charts which just happens to take aggregation pipelines as input. So, now's a good time to give it a try!

Rate this article

More from this series

Rust Tutorials
  • Quick Start: Up and Running with Rust and MongoDB
  • Getting Started with Aggregation Pipelines in Rust
MongoDB Icon
  • Developer Hub
  • Documentation
  • University
  • Community Forums

© MongoDB, Inc.