聚合管道是一个数据聚合框架,以数据处理管道的概念为模型。
要学习;了解有关聚合的更多信息,请参阅服务器手册中的聚合管道。
先决条件
您必须设置以下组件才能运行本指南中的代码示例:
一个
test.restaurants集合,其中填充了来自文档资产Github中restaurants.json文件的文档。以下 import 语句:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
重要
本指南使用自定义Subscriber实现,如自定义订阅者实现示例指南中所述。
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义MongoDatabase和MongoCollection实例。
以下代码连接到在端口27017上的localhost上运行的独立 MongoDB 部署。 然后,定义database变量以引用test数据库,并collection变量以引用restaurants集合:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。
执行聚合
要执行聚合,请将聚合阶段列表传递给 MongoCollection.aggregate() 方法。驱动程序提供了 Aggregates 辅助工具类,其中包含聚合阶段的构建者。
在此示例中,聚合管道执行以下任务:
使用
$match阶段筛选categories数组字段包含元素"Bakery"的文档。 该示例使用Aggregates.match()构建$match阶段。
使用
$group阶段按stars字段对匹配文档进行群组,并累积每个不同的stars值的文档计数。该示例使用Aggregates.group()构建$group阶段,并使用Accumulators.sum()构建累加器表达式。对于在$group阶段使用的累加器表达式,驱动程序提供了Accumulators辅助工具类。
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
使用聚合表达式
对于 $group 累加器表达式,驱动程序提供了 Accumulators 辅助工具类。对于其他聚合表达式,请使用 Document 类手动构建表达式。
在以下示例中,聚合管道使用$project阶段仅返回name字段和计算字段firstCategory ,其值是categories数组中的第一个元素。 该示例使用Aggregates.project()和各种Projections类方法构建$project阶段:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
解释聚合
要$explain聚合管道,请调用AggregatePublisher.explain()方法:
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
MongoDB Search
您可以通过创建并运行包含以下管道阶段之一的聚合管道来执行MongoDB搜索查询:
$search$searchMeta
Java Reactive Streams驾驶员提供聚合。 搜索() 和 Aggregates.searchMeta() 方法执行MongoDB搜索查询。
要学习;了解有关MongoDB搜索管道阶段的更多信息,请参阅Atlas文档中的选择聚合管道阶段。
创建管道搜索阶段
您可以使用“搜索”操作符在“MongoDB搜索”管道阶段中创建搜索条件。
Java Reactive Streams驱动程序为以下操作符提供了辅助方法:
Operator | 说明 |
|---|---|
从不完整输入字符串中搜索包含字符序列的单词或短语。 | |
将两个或多个操作符组合到一个查询中。 | |
检查字段是否与您指定的值匹配。映射到 | |
测试指定索引字段名称的路径在文档中是否存在。 | |
在给定路径搜索由BSON数字、日期、布尔值、ObjectId、uuid 或字符串值组成的大量,并返回该字段的值等于指定大量中任意值的文档。 | |
返回与输入文档类似的文档。 | |
支持对数字、日期和GeoJSON point值进行查询和评分。 | |
使用索引配置中指定的分析器搜索包含有序术语序列的文档。 | |
支持查询索引字段和值的组合。 | |
支持对数字、日期和字符串值进行查询和评分。 映射到 | |
将查询字段解释为正则表达式。 | |
使用在索引配置中指定的分析器执行全文搜索。 | |
启用在搜索字符串中使用可匹配任何字符的特殊字符的查询。 |
管道搜索阶段示例
注意
Atlas样本数据集
此示例使用Atlas示例数据集中的 sample_mflix.movies集合。要学习;了解如何设立免费套餐的Atlas 集群并加载示例数据集,请参阅Atlas文档中的Atlas入门教程。
在运行此示例之前,您必须在具有以下定义的 movies集合上创建MongoDB搜索索引:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
要学习;了解有关创建MongoDB Search 索引的更多信息,请参阅 索引指南的MongoDB Search 索引管理部分。
以下代码创建具有以下规范的 $search 阶段:
检查
genres大量是否包含"Comedy"在
fullplot字段中搜索字段"new york"匹配介于
1950和2000(含)之间的year值搜索以术语
"Love"开头的title值
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
要学习;了解有关MongoDB Search 助手方法的更多信息,请参阅驱动程序核心API文档中的 SearchOperator 接口参考。
更多信息
有关聚合阶段的完整列表,请参阅MongoDB Server手册中的聚合阶段。
要学习;了解如何组装聚合管道并查看示例,请参阅MongoDB Server手册中的聚合管道。
要学习;了解有关解释MongoDB操作的更多信息,请参阅MongoDB Server手册中的解释输出和查询计划。
API 文档
要学习;了解有关本指南中提到的类和方法的更多信息,请参阅以下API文档: