The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
To learn more about aggregation, see Aggregation Pipeline in the Server manual.
Prerequisites
You must set up the following components to run the code examples in this guide:
A
test.restaurantscollection populated with documents from therestaurants.jsonfile in the documentation assets GitHub.The following import statements:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
Important
This guide uses custom Subscriber implementations, which are
described in the Sample Custom Subscriber Implementations guide.
Connect to a MongoDB Deployment
First, connect to a MongoDB deployment, then declare and define
MongoDatabase and MongoCollection instances.
The following code connects to a standalone
MongoDB deployment running on localhost on port 27017. Then, it
defines the database variable to refer to the test database and
the collection variable to refer to the restaurants collection:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.
Perform Aggregation
To perform aggregation, pass a list of aggregation stages to the
MongoCollection.aggregate() method. The driver provides the
Aggregates helper class that contains builders for aggregation
stages.
In this example, the aggregation pipeline performs the following tasks:
Uses a
$matchstage to filter for documents in which thecategoriesarray field contains the element"Bakery". The example usesAggregates.match()to build the$matchstage.
Uses a
$groupstage to group the matching documents by thestarsfield, accumulating a count of documents for each distinct value ofstars. The example usesAggregates.group()to build the$groupstage andAccumulators.sum()to build the accumulator expression. For the accumulator expressions for use within the$groupstage, the driver providesAccumulatorshelper class.
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
Use Aggregation Expressions
For $group accumulator expressions, the driver provides the
Accumulators helper class. For other aggregation expressions,
manually build the expression by using the Document class.
In the following example, the aggregation pipeline uses a
$project stage to return only the name field and the calculated
field firstCategory whose value is the first element in the
categories array. The example uses Aggregates.project() and
various Projections class methods to build the $project stage:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
Explain an Aggregation
To $explain an aggregation pipeline, call the
AggregatePublisher.explain() method:
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
Atlas Search
You can perform an Atlas Search query by creating and running an aggregation pipeline that contains one of the following pipeline stages:
$search$searchMeta
The Java Reactive Streams driver provides the Aggregates.search() and Aggregates.searchMeta() methods to perform Atlas Search queries.
To learn more about Atlas Search pipeline stages, see Choose the Aggregation Pipeline Stage in the Atlas documentation.
Create Pipeline Search Stages
You can create the search criteria in your Atlas Search pipeline stage by using Search operators.
The Java Reactive Streams driver provides helper methods for the following operators:
Operator | Description |
|---|---|
Performs a search for a word or phrase that contains a sequence of characters from an incomplete input string. | |
Combines two or more operators into a single query. | |
Checks whether a field matches a value you specify.
Maps to the | |
Tests if a path to a specified indexed field name exists in a document. | |
Performs a search for an array of BSON number, date, boolean, objectId, uuid, or string values at the given path and returns documents where the value of the field equals any value in the specified array. | |
Returns documents similar to input documents. | |
Supports querying and scoring numeric, date, and GeoJSON point values. | |
Performs a search for documents containing an ordered sequence of terms using the analyzer specified in the index configuration. | |
Supports querying a combination of indexed fields and values. | |
Supports querying and scoring numeric, date, and string values.
Maps to the | |
Interprets the query field as a regular expression. | |
Performs a full-text search using the analyzer that you specify in the index configuration. | |
Enables queries which use special characters in the search string that can match any character. |
Example Pipeline Search Stage
Note
Atlas Sample Dataset
This example uses the sample_mflix.movies collection from the Atlas sample
datasets. To learn how to set up a free-tier Atlas cluster and load the
sample dataset, see the Get Started with Atlas tutorial
in the Atlas documentation.
Before you can run this example, you must create a MongoDB Search index on the movies
collection that has the following definition:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
To learn more about creating MongoDB Search indexes, see the Atlas Search Index Management section of the Indexes guide.
The following code creates a $search stage that has the following
specifications:
Checks that the
genresarray includes"Comedy"Searches the
fullplotfield for the phrase"new york"Matches
yearvalues between1950and2000, inclusiveSearches for
titlevalues that begins with the term"Love"
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
To learn more about the MongoDB Search helper methods, see the SearchOperator interface reference in the Driver Core API documentation.
Additional Information
To view a full list of expression operators, see Aggregation Operators in the MongoDB Server manual.
To learn about assembling an aggregation pipeline and view examples, see Aggregation Pipeline in the MongoDB Server manual.
To learn more about creating pipeline stages, see Aggregation Stages in the MongoDB Server manual.
To learn more about explaining MongoDB operations, see Explain Output and Query Plans in the MongoDB Server manual.
API Documentation
To learn more about the classes and methods mentioned in this guide, see the following API documentation: