Overview
在本指南中,您可以了解如何使用 Scala 驱动程序来执行聚合操作。
聚合操作处理 MongoDB 集合中的数据并返回计算结果。 MongoDB 聚合框架是 Query API 的一部分,以数据处理管道的概念为模型。 文档进入包含一个或多个阶段的管道,该管道将文档转换为聚合结果。
类比
聚合操作类似于汽车工厂。汽车工厂有一条装配线,其中包含配备专用工具的装配站,用于完成特定的工作,例如钻机和焊机。毛坯零件会进入工厂,然后装配线将其转换并组装为成品。
聚合管道是装配线,聚合阶段是装配站,操作符表达式则是专用工具。
比较聚合与查找操作
下表列出了查找操作可以执行的不同任务,并将它们与聚合操作可以执行的任务进行了比较。 聚合框架提供了扩展功能,允许您转换和操作数据。
查找操作 | 聚合操作 |
|---|---|
选择要返回的特定文档 | 选择要返回的特定文档 |
限制
执行聚合操作时要考虑以下限制:
返回的文档不能违反16 MB 的BSON文档大小限制。
默认下,管道阶段的内存限制为 100 MB。 您可以通过将
true值传递给allowDiskUse()方法并将该方法链接到aggregate()来超出此限制。$graphLookup操作符有 100兆字节的严格内存限制,并忽略传递给
allowDiskUse()方法的值。
运行聚合操作
注意
样本数据
本指南中的示例使用Atlas示例数据集的 sample_restaurants数据库中的 restaurants集合。要学习如何创建免费的MongoDB Atlas 集群并加载示例数据集,请参阅Atlas入门。
要执行聚合,请将包含管道阶段的列表传递给 aggregate() 方法。Scala驱动程序提供了 Aggregates 类,其中包括用于构建管道阶段的辅助方法。
要学习;了解有关管道阶段及其相应 Aggregates 辅助方法的详情,请参阅以下资源:
对文档进行筛选、分组和计数
此代码示例计算纽约每个区的面包店数量。 为此,它调用 aggregate() 方法并将聚合管道作为阶段列表传递。 代码使用以下 Aggregates 辅助方法构建这些阶段:
filter():构建 $match 阶段以过滤cuisine值为 的文档"Bakery"group():构建 $ 群组阶段以按borough字段对匹配文档进行群组,从而累积每个不同值的文档计数
val pipeline = Seq(Aggregates.filter(Filters.equal("cuisine", "Bakery")), Aggregates.group("$borough", Accumulators.sum("count", 1)) ) collection.aggregate(pipeline) .subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"))
{"_id": "Brooklyn", "count": 173} {"_id": "Queens", "count": 204} {"_id": "Bronx", "count": 71} {"_id": "Staten Island", "count": 20} {"_id": "Missing", "count": 2} {"_id": "Manhattan", "count": 221}
解释聚合
要查看有关MongoDB如何执行您的操作的信息,您可以指示MongoDB查询规划器对其进行解释。MongoDB解释操作时,会返回执行计划和性能统计信息。执行计划是MongoDB完成操作的一种潜在方式。 当您指示MongoDB解释一个操作时,默认下它会返回MongoDB执行的计划和任何被拒绝的执行计划。
要解释聚合操作,请将 explain() 方法链接到 aggregate() 方法。 您可以向 explain() 传递详细程度,从而修改该方法返回的信息类型和数量。 有关详细程度的更多信息,请参阅MongoDB Server手册中的详细程度模式。
以下示例指示MongoDB解释前面的筛选器、分组和计数文档示例中的聚合操作。此代码会将详细程度值 ExplainVerbosity.EXECUTION_STATS 传递给 explain() 方法,从而将该方法配置为返回描述获胜计划执行情况的统计信息:
val pipelineToExplain = Seq(Aggregates.filter(Filters.equal("cuisine", "Bakery")), Aggregates.group("$borough", Accumulators.sum("count", 1)) ) collection.aggregate(pipelineToExplain) .explain(ExplainVerbosity.EXECUTION_STATS) .subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"))
{"explainVersion": "2", "queryPlanner": {"namespace": "sample_restaurants.restaurants", "indexFilterSet": false, "parsedQuery": {"cuisine": {"$eq": "Bakery"}}, "queryHash": "865F14C3", "planCacheKey": "0FC225DA", "optimizedPipeline": true, "maxIndexedOrSolutionsReached": false, "maxIndexedAndSolutionsReached": false, "maxScansToExplodeReached": false, "winningPlan": {"queryPlan": {"stage": "GROUP", "planNodeId": 3, "inputStage": {"stage": "COLLSCAN", "planNodeId": 1, "filter": {"cuisine": {"$eq": "Bakery"}}, "direction": "forward"}}, ...}
使用MongoDB Search 运行全文搜索
要指定对一个或多个字段进行全文搜索,可以创建 $search管道阶段。Scala 驱动程序提供了 Aggregates.search() 辅助工具方法来创建此阶段。search() 方法需要以下参数:
SearchOperator实例:指定要搜索的字段和文本。SearchOptions实例:指定用于自定义全文搜索的选项。您必须将index选项设立为要使用的 MongoDB 搜索索引 的名称。
此示例创建管道阶段来执行以下操作:
在
name字段中搜索包含单词"Salt"的文本仅投影匹配文档的
_id和name值
val operator = SearchOperator.text(SearchPath.fieldPath("name"), "Salt") val options = searchOptions().index("<search index name>") val pipeline = Seq(Aggregates.search(operator, options), Aggregates.project(Projections.include("name"))) collection.aggregate(pipeline) .subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"))
{"_id": {"$oid": "..."}, "name": "Fresh Salt"} {"_id": {"$oid": "..."}, "name": "Salt & Pepper"} {"_id": {"$oid": "..."}, "name": "Salt + Charcoal"} {"_id": {"$oid": "..."}, "name": "A Salt & Battery"} {"_id": {"$oid": "..."}, "name": "Salt And Fat"} {"_id": {"$oid": "..."}, "name": "Salt And Pepper Diner"}
重要
要运行前面的示例,您必须在涵盖 name 字段的 restaurants 集合上创建 MongoDB 搜索索引。然后,将 "<search index name>" 占位符替换为索引的名称。要学习有关MongoDB搜索索引的更多信息,请参阅MongoDB搜索索引指南。
搜索操作符辅助方法
Scala驱动程序为以下操作符提供了辅助方法:
Operator | 说明 |
|---|---|
从不完整输入字符串中搜索包含字符序列的单词或短语。 | |
将两个或多个操作符组合到一个查询中。 | |
检查字段是否与您指定的值匹配。映射到 | |
测试指定索引字段名称的路径在文档中是否存在。 | |
在给定路径搜索由BSON数字、日期、布尔值、ObjectId、uuid 或字符串值组成的大量,并返回该字段的值等于指定大量中任意值的文档。 | |
返回与输入文档类似的文档。 | |
支持对数字、日期和GeoJSON point值进行查询和评分。 | |
使用索引配置中指定的分析器搜索包含有序术语序列的文档。 | |
支持查询索引字段和值的组合。 | |
支持对数字、日期和字符串值进行查询和评分。 映射到 | |
将查询字段解释为正则表达式。 | |
使用在索引配置中指定的分析器执行全文搜索。 | |
启用在搜索字符串中使用可匹配任何字符的特殊字符的查询。 |
管道搜索阶段示例
在运行此示例之前,您必须在具有以下定义的 movies 集合上创建 MongoDB 搜索索引:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
要学习有关创建MongoDB搜索索引的更多信息,请参阅MongoDB搜索索引指南。
以下代码创建具有以下规范的 $search 阶段:
检查
genres大量是否包含"Comedy"在
fullplot字段中搜索字段"new york"匹配介于
1950和2000(含)之间的year值搜索以术语
"Love"开头的title值
val searchStage = Aggregates.search( SearchOperator.compound() .must( Iterable( SearchOperator.in(fieldPath("genres"), List("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard("Love *", fieldPath("title")), ).asJava ) ) val projectStage = Aggregates.project( Projections.include("title", "year", "genres")) val aggregatePipelineStages = Seq(searchStage, projectStage) collection.aggregate(aggregatePipelineStages) .subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"))
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
要学习有关MongoDB 搜索辅助程序方法的更多信息,请参阅驱动程序 Core API 文档中的SearchOperator接口参考。
更多信息
MongoDB Server 手册
要学习;了解有关本指南所讨论主题的更多信息,请参阅MongoDB Server手册中的以下页面:
API 文档
要进一步了解本指南所讨论的方法和类型,请参阅以下 API 文档: