The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
To learn more about aggregation, see Aggregation Pipeline in the Server manual.
Prerequisites
You must set up the following components to run the code examples in this guide:
- A - test.restaurantscollection populated with documents from the- restaurants.jsonfile in the documentation assets GitHub.
- The following import statements: 
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document; 
Important
This guide uses custom Subscriber implementations, which are
described in the Sample Custom Subscriber Implementations guide.
Connect to a MongoDB Deployment
First, connect to a MongoDB deployment, then declare and define
MongoDatabase and MongoCollection instances.
The following code connects to a standalone
MongoDB deployment running on localhost on port 27017. Then, it
defines the database variable to refer to the test database and
the collection variable to refer to the restaurants collection:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants"); 
To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.
Perform Aggregation
To perform aggregation, pass a list of aggregation stages to the
MongoCollection.aggregate() method. The driver provides the
Aggregates helper class that contains builders for aggregation
stages.
In this example, the aggregation pipeline performs the following tasks:
- Uses a - $matchstage to filter for documents in which the- categoriesarray field contains the element- "Bakery". The example uses- Aggregates.match()to build the- $matchstage.
- Uses a - $groupstage to group the matching documents by the- starsfield, accumulating a count of documents for each distinct value of- stars. The example uses- Aggregates.group()to build the- $groupstage and- Accumulators.sum()to build the accumulator expression. For the accumulator expressions for use within the- $groupstage, the driver provides- Accumulatorshelper class.
collection.aggregate(     Arrays.asList(         Aggregates.match(Filters.eq("categories", "Bakery")),         Aggregates.group("$stars", Accumulators.sum("count", 1))     ) ).subscribe(new PrintDocumentSubscriber()); 
Use Aggregation Expressions
For $group accumulator expressions, the driver provides the
Accumulators helper class. For other aggregation expressions,
manually build the expression by using the Document class.
In the following example, the aggregation pipeline uses a
$project stage to return only the name field and the calculated
field firstCategory whose value is the first element in the
categories array. The example uses Aggregates.project() and
various Projections class methods to build the $project stage:
collection.aggregate(     Arrays.asList(         Aggregates.project(             Projections.fields(                 Projections.excludeId(),                 Projections.include("name"),                 Projections.computed(                     "firstCategory",                         new Document("$arrayElemAt", Arrays.asList("$categories", 0))                 )             )         )     ) ).subscribe(new PrintDocumentSubscriber()); 
Explain an Aggregation
To $explain an aggregation pipeline, call the
AggregatePublisher.explain() method:
collection.aggregate(     Arrays.asList(         Aggregates.match(Filters.eq("categories", "Bakery")),         Aggregates.group("$stars", Accumulators.sum("count", 1))))     .explain()     .subscribe(new PrintDocumentSubscriber()); 
MongoDB Search
You can perform a MongoDB Search query by creating and running an aggregation pipeline that contains one of the following pipeline stages:
- $search
- $searchMeta
The Java Reactive Streams driver provides the Aggregates.search() and Aggregates.searchMeta() methods to perform MongoDB Search queries.
To learn more about MongoDB Search pipeline stages, see Choose the Aggregation Pipeline Stage in the Atlas documentation.
Create Pipeline Search Stages
You can create the search criteria in your MongoDB Search pipeline stage by using Search operators.
The Java Reactive Streams driver provides helper methods for the following operators:
| Operator | Description | 
|---|---|
| Performs a search for a word or phrase that contains a sequence of characters from an incomplete input string. | |
| Combines two or more operators into a single query. | |
| Checks whether a field matches a value you specify.
Maps to the  | |
| Tests if a path to a specified indexed field name exists in a document. | |
| Performs a search for an array of BSON number, date, boolean, objectId, uuid, or string values at the given path and returns documents where the value of the field equals any value in the specified array. | |
| Returns documents similar to input documents. | |
| Supports querying and scoring numeric, date, and GeoJSON point values. | |
| Performs a search for documents containing an ordered sequence of terms using the analyzer specified in the index configuration. | |
| Supports querying a combination of indexed fields and values. | |
| Supports querying and scoring numeric, date, and string values.
Maps to the  | |
| Interprets the query field as a regular expression. | |
| Performs a full-text search using the analyzer that you specify in the index configuration. | |
| Enables queries which use special characters in the search string that can match any character. | 
Example Pipeline Search Stage
Note
Atlas Sample Dataset
This example uses the sample_mflix.movies collection from the Atlas sample
datasets. To learn how to set up a free-tier Atlas cluster and load the
sample dataset, see the Get Started with Atlas tutorial
in the Atlas documentation.
Before you can run this example, you must create a MongoDB Search index on the movies
collection that has the following definition:
{   "mappings": {     "dynamic": true,     "fields": {       "title": {         "analyzer": "lucene.keyword",         "type": "string"       },       "genres": {         "normalizer": "lowercase",         "type": "token"       }     }   } } 
To learn more about creating MongoDB Search indexes, see the MongoDB Search Index Management section of the Indexes guide.
The following code creates a $search stage that has the following
specifications:
- Checks that the - genresarray includes- "Comedy"
- Searches the - fullplotfield for the phrase- "new york"
- Matches - yearvalues between- 1950and- 2000, inclusive
- Searches for - titlevalues that begins with the term- "Love"
Bson searchStageFilters = Aggregates.search(         SearchOperator.compound()                 .filter(                         List.of(                                 SearchOperator.in(fieldPath("genres"), List.of("Comedy")),                                 SearchOperator.phrase(fieldPath("fullplot"), "new york"),                                 SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000),                                 SearchOperator.wildcard(fieldPath("title"), "Love *")                         ))); Bson projection = Aggregates.project(Projections.fields(         Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block(); 
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994} 
To learn more about the MongoDB Search helper methods, see the SearchOperator interface reference in the Driver Core API documentation.
Additional Information
To view a full list of expression operators, see Aggregation Operators in the MongoDB Server manual.
To learn about assembling an aggregation pipeline and view examples, see Aggregation Pipeline in the MongoDB Server manual.
To learn more about creating pipeline stages, see Aggregation Stages in the MongoDB Server manual.
To learn more about explaining MongoDB operations, see Explain Output and Query Plans in the MongoDB Server manual.
API Documentation
To learn more about the classes and methods mentioned in this guide, see the following API documentation: