The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
To learn more about aggregation, see Aggregation Pipeline in the Server manual.
Prerequisites
You must set up the following components to run the code examples in this guide:
A
test.restaurantscollection populated with documents from therestaurants.jsonfile in the documentation assets GitHub.The following import statements:
import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Accumulators._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.Projections._ 
Note
This guide uses the Observable implicits as covered in the
Quick Start Primer.
Connect to a MongoDB Deployment
First, connect to a MongoDB deployment, then declare and define
MongoDatabase and MongoCollection instances.
The following code connects to a standalone
MongoDB deployment running on localhost on port 27017. Then, it
defines the database variable to refer to the test database and
the collection variable to refer to the restaurants collection:
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants") 
To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.
Perform Aggregation
To perform aggregation, pass a list of aggregation stages to the
MongoCollection.aggregate() method. The driver provides the
Aggregates helper class that contains builders for aggregation
stages.
In this example, the aggregation pipeline performs the following tasks:
Uses a
$matchstage to filter for documents in which thecategoriesarray field contains the element"Bakery". The example usesAggregates.filter()to build the$matchstage.
Uses a
$groupstage to group the matching documents by thestarsfield, accumulating a count of documents for each distinct value ofstars. The example usesAggregates.group()to build the$groupstage andAccumulators.sum()to build the accumulator expression. For the accumulator expressions for use within the$groupstage, the driver providesAccumulatorshelper class.
collection.aggregate(Seq(     Aggregates.filter(Filters.equal("categories", "Bakery")),     Aggregates.group("$stars", Accumulators.sum("count", 1)) )).printResults() 
Use Aggregation Expressions
For $group accumulator expressions, the driver provides the
Accumulators helper class. For other aggregation expressions,
manually build the expression by using the Document class.
In the following example, the aggregation pipeline uses a
$project stage to return only the name field and the calculated
field firstCategory whose value is the first element in the
categories array. The example uses Aggregates.project() and
various Projections class methods to build the $project stage:
collection.aggregate(   Seq(     Aggregates.project(       Projections.fields(         Projections.excludeId(),         Projections.include("name"),         Projections.computed(           "firstCategory",           Document("$arrayElemAt"-> Seq("$categories", 0))         )       )     )   ) ).printResults() 
Explain an Aggregation
To $explain an aggregation pipeline, call the
AggregatePublisher.explain() method:
collection.aggregate(   Seq(Aggregates.filter(Filters.eq("categories", "Bakery")),       Aggregates.group("$stars", Accumulators.sum("count", 1))) ).explain().printResults()