聚合管道是一个数据聚合框架,以数据处理管道的概念为模型。
要学习;了解有关聚合的更多信息,请参阅服务器手册中的聚合管道。
先决条件
您必须设置以下组件才能运行本指南中的代码示例:
一个
test.restaurants集合,其中填充了来自文档资产Github中restaurants.json文件的文档。以下 import 语句:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
重要
本指南使用自定义Subscriber实现,如自定义订阅者实现示例指南中所述。
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义MongoDatabase和MongoCollection实例。
以下代码连接到在端口27017上的localhost上运行的独立 MongoDB 部署。 然后,定义database变量以引用test数据库,并collection变量以引用restaurants集合:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。
执行聚合
要执行聚合,请将聚合阶段列表传递给 MongoCollection.aggregate() 方法。驱动程序提供了 Aggregates 辅助工具类,其中包含聚合阶段的构建者。
在此示例中,聚合管道执行以下任务:
使用
$match阶段筛选categories数组字段包含元素"Bakery"的文档。 该示例使用Aggregates.match()构建$match阶段。
使用
$group阶段按stars字段对匹配文档进行群组,并累积每个不同的stars值的文档计数。该示例使用Aggregates.group()构建$group阶段,并使用Accumulators.sum()构建累加器表达式。对于在$group阶段使用的累加器表达式,驱动程序提供了Accumulators辅助工具类。
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
使用聚合表达式
对于 $group 累加器表达式,驱动程序提供了 Accumulators 辅助工具类。对于其他聚合表达式,请使用 Document 类手动构建表达式。
在以下示例中,聚合管道使用$project阶段仅返回name字段和计算字段firstCategory ,其值是categories数组中的第一个元素。 该示例使用Aggregates.project()和各种Projections类方法构建$project阶段:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
解释聚合
要$explain聚合管道,请调用AggregatePublisher.explain()方法:
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());