对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

聚合构建器

在本指南中,您可以学习;了解如何使用 Aggregates 类,该类提供了在Java Reactive Streams驾驶员中构建聚合管道阶段的静态工厂方法。

有关聚合的更全面介绍,请参阅 聚合指南。

本页上的示例假定导入了以下类的方法:

  • Aggregates

  • Filters

  • Projections

  • Sorts

  • Accumulators

以下代码演示了如何导入上述类的方法:

import static java.util.Arrays.asList;
import static com.mongodb.client.model.Accumulators.*;
import static com.mongodb.client.model.Aggregates.*;
import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Projections.*;
import static com.mongodb.client.model.Sorts.*;
import static com.mongodb.client.model.search.SearchPath.fieldPath;

使用这些方法构建管道阶段,并在聚合中以列表形式予以指定:

Bson matchStage = match(eq("some_field", "some_criteria"));
Bson sortByCountStage = sortByCount("some_field");
Publisher<Document> results = collection.aggregate(asList(matchStage, sortByCountStage));
Flux.from(results).collectList().block();

使用match() 方法创建 $match管道阶段,将传入文档与指定的查询过滤进行匹配,并筛选掉不匹配的文档。您可以使用 Filters 类或任何其他Bson 实例构造过滤。

以下示例创建一个管道阶段,用于匹配 title 字段等于“The Shawshank Redemption”的所有文档:

match(eq("title", "The Shawshank Redemption"));

使用project() 方法创建用于项目指定文档字段的 $ 项目管道阶段。聚合中的字段投影遵循与查询中的字段投影相同的规则。您可以使用 Projections 类或任何其他Bson 实例构造投影。

以下示例创建一个管道阶段,其中不包括 _id 字段,但包括 titleplot 字段:

project(fields(include("title", "plot"), excludeId()));

$project 阶段可以输出现有字段或计算字段。

以下示例创建一个管道阶段,将 rated 字段投影到名为 rating 的新字段,从而有效地重命名该字段。

project(fields(computed("rating", "$rated"), excludeId()));

使用 documents() 方法创建 $documents 管道阶段,从输入值返回字面文档。

重要

如果在聚合管道中使用 $documents 阶段,则必须是管道中的第一个阶段。

以下示例创建一个管道阶段,用于创建带有 title 字段的示例文档:

documents(asList(
new Document("title", "The Shawshank Redemption"),
new Document("title", "Back to the Future"),
new Document("title", "Jurassic Park")));

重要

如果您使用 documents() 方法向聚合管道提供输入,则必须对数据库而不是集合调用 aggregate() 方法。

使用 sample() 方法创建 $sample 管道阶段,从输入中随机选择文档。

以下示例创建一个管道阶段,用于随机选择 5 个文档:

sample(5);

使用sort() 方法创建 $sort管道阶段,以按指定条件排序。您可以使用 Sorts 类或任何其他Bson 实例构造排序条件。

以下示例创建了一个管道阶段,该阶段根据 year字段的值按降序排序:

sort(orderBy(descending("year"), ascending("title")));

使用 skip() 方法创建一个 $skip 管道阶段,以在将文档传递到下一阶段之前跳过指定数量的文档。

以下示例创建了一个跳过前 5 个文档的管道阶段:

skip(5);

使用 $limit 管道阶段来限制传递到下一阶段的文档数量。

以下示例将创建一个管道阶段,从而将文档数量限制为 10 个:

limit(10);

使用 lookup() 方法创建 $lookup 管道阶段,在两个集合之间执行连接和非相关子查询。

以下示例创建了一个管道阶段,该阶段在 moviescomments 集合之间执行左外连接:

  • 它将 movies 中的 _id 字段与 comments 中的 movie_id 字段连接起来

  • 它会在 joined_comments 字段中输出结果:

lookup("comments", "_id", "movie_id", "joined_comments");

以下示例创建了一个管道阶段,该阶段对 movies集合执行自连接,以查找与每部电影股票类型且具有较高 imdb.rating 的电影:

List<Variable<Object>> variables = asList(
new Variable<>("genre", new Document("$arrayElemAt", asList("$genres", 0))),
new Variable<>("rating", "$imdb.rating")
);
List<Bson> pipeline = asList(
match(expr(new Document("$and",
asList(
new Document("$in", asList("$$genre", "$genres")),
new Document("$gt", asList("$imdb.rating", "$$rating"))
)))),
project(fields(include("title", "imdb.rating"), excludeId())));
Bson similarHigherRatedLookup = lookup("movies", variables, pipeline, "similar_higher_rated");

使用 group() 方法创建 $group 管道阶段,根据指定表达式对文档分组,并为每个不同分组输出一个文档。

提示

驱动程序包含“累加器”类,该类为每个支持的累加器提供静态工厂方法。

以下示例创建了一个管道阶段,该阶段按照 customerId 字段的值对文档进行分组。每个组将 quantity 字段值的总和和平均值累积到 totalQuantityaverageQuantity 字段中。

group("$customerId", sum("totalQuantity", "$quantity"), avg("averageQuantity", "$quantity"));

从服务器手册的累加器页面了解详情有关累加器操作符的更多信息。

pick-n 累加器是聚合累加操作符,在特定排序的情况下返回顶部和底部元素。使用下列生成器之一创建聚合累加操作符:

提示

仅当MongoDB 部署(包括Atlas集群)运行的是MongoDB Server 5.2 或更高版本时,您才能使用这些 pick-n 累加器执行聚合操作。

通过服务器手册的累加器页面,了解可以将累加器操作符与哪些聚合管道阶段一起使用。

minN()构建器创建$minN累加器,该累加器从包含n个分组最低值的文档中返回数据。

提示

$minN$bottomN累加器可执行类似任务。 有关每个累加器的建议用法,请参阅$minN 和 $bottomN 累加器的比较

以下示例演示了如何使用 minN() 方法返回电影的三个最低 imdb.rating 值,并按 year 进行分组:

group(
"$year",
minN(
"lowest_three_ratings",
new BsonString("$imdb.rating"),
3
));

请参阅 minN() API 文档了解更多信息。

maxN() 累加器从包含分组中 n 个最高值的文档中返回数据。

以下示例演示了如何使用 maxN() 方法返回电影的两个最高 imdb.rating 值,并按 year 进行分组:

group(
"$year",
maxN(
"highest_two_ratings",
new BsonString("$imdb.rating"),
2
));

请参阅 maxN() API 文档了解更多信息。

firstN() 累加器返回指定排序顺序的每个分组中前 n 个文档的数据。

提示

$firstN$topN累加器可执行类似任务。 有关每个累加器的建议用法,请参阅$firstN 和 $topN 累加器的比较

以下示例演示了如何使用 firstN() 方法根据电影进入舞台的顺序返回前四个电影 title 值,并按 year 进行分组:

group(
"$year",
firstN(
"first_four_movies",
new BsonString("$title"),
4
));

请参阅 firstN() API 文档了解更多信息。

lastN() 累加器返回指定排序顺序的每个分组中最后 n 个文档的数据。

以下示例演示了如何使用 lastN() 方法根据电影进入舞台的顺序显示最后三个电影 title 值,并按 year 分组:

group(
"$year",
lastN(
"last_three_movies",
new BsonString("$title"),
3
));

请参阅 lastN() API 文档了解更多信息。

top() 累加器会根据指定的排序顺序,返回分组中第一个文档的数据。

以下示例演示了如何使用 top() 方法返回基于 imdb.rating 且按 year 分组的评分最高电影的 titleimdb.rating 值。

group(
"$year",
top(
"top_rated_movie",
descending("imdb.rating"),
asList(new BsonString("$title"), new BsonString("$imdb.rating"))
));

请参阅 top() API 文档 了解更多信息。

topN() 累加器会从包含指定字段最高 n 值的文档中返回数据。

提示

$firstN$topN累加器可执行类似任务。 有关每个累加器的建议用法,请参阅$firstN 和 $topN 累加器的比较

以下示例演示如何使用 topN() 方法根据 runtime 值返回长度排名前三的电影的 titleruntime 值,并按 year 分组。

group(
"$year",
topN(
"longest_three_movies",
descending("runtime"),
asList(new BsonString("$title"), new BsonString("$runtime")),
3
));

请参阅 topN() API 文档了解更多信息。

bottom() 累加器会根据指定的排序顺序,返回分组中最后一个文档的数据。

以下示例演示了如何使用 bottom() 方法根据 runtime 值返回最短电影的 titleruntime 值,并按 year 分组。

group(
"$year",
bottom(
"shortest_movies",
descending("runtime"),
asList(new BsonString("$title"), new BsonString("$runtime"))
));

请参阅 bottom() API 文档了解更多信息。

bottomN() 累加器会从包含指定字段最低 n 值的文档中返回数据。

提示

$minN$bottomN累加器可执行类似任务。 有关每个累加器的建议用法,请参阅$minN 和 $bottomN 累加器的比较

以下示例演示了如何使用 bottomN() 方法根据 imdb.rating 值返回评分最低的两部电影的 titleimdb.rating 值,并按 year 分组:

group(
"$year",
bottomN(
"lowest_rated_two_movies",
descending("imdb.rating"),
asList(new BsonString("$title"), new BsonString("$imdb.rating")),
2
));

请参阅 bottom() API 文档了解更多信息。

使用 unwind() 方法创建 $unwind 管道阶段,从输入文档解构数组字段,为每个数组元素创建输出文档。

以下示例为 sizes 数组中的每个元素创建了文档:

unwind("$sizes");

以下示例保留大量字段缺失或值为 null 或大量为空的文档:

unwind("$sizes", new UnwindOptions().preserveNullAndEmptyArrays(true));

以下示例在名为 "position" 的字段中包含大量索引:

unwind("$sizes", new UnwindOptions().includeArrayIndex("position"));

使用 out() 方法创建 $out 管道阶段,将所有文档写入同一数据库中的指定集合。

重要

$out 阶段必须是任何聚合管道中的最后一个阶段。

以下示例将管道结果写入 authors 集合:

out("authors");

使用 merge() 方法创建 $merge 管道阶段,将所有文档合并到指定的集合中。

重要

$merge 阶段必须是任何聚合管道中的最后一个阶段。

以下示例使用默认选项将管道合并到 authors 集合中:

merge("authors");

以下示例通过一些选项将管道合并到 reporting 数据库中的 customers 集合,这些选项指定,如果 datecustomerId 都匹配,则替换文档,否则插入文档:

merge(new MongoNamespace("reporting", "customers"),
new MergeOptions().uniqueIdentifier(asList("date", "customerId"))
.whenMatched(MergeOptions.WhenMatched.REPLACE)
.whenNotMatched(MergeOptions.WhenNotMatched.INSERT));

使用 graphLookup() 方法创建 $graphLookup 管道阶段,对指定集合执行递归搜索,以将一个文档中的指定字段与另一个文档的指定字段进行匹配。

以下示例计算 contacts 集合中用户的社交网络图,以递归方式将 friends 字段的值与 name 字段进行匹配:

graphLookup("contacts", "$friends", "friends", "name", "socialNetwork");

使用 GraphLookupOptions,如果需要,您可以指定要递归的深度以及深度字段的名称。在本例中,$graphLookup 最多会递归两次,并为每个文档创建名为 degrees 的字段,其中包含递归深度信息。

graphLookup("contacts", "$friends", "friends", "name", "socialNetwork",
new GraphLookupOptions().maxDepth(2).depthField("degrees"));

使用 GraphLookupOptions,您可以指定筛选器,文档必须与该筛选器匹配,MongoDB 才能将其包含在搜索中。在此示例中,只有 hobbies 字段包含“golf”的链接才会被包括在内。

graphLookup("contacts", "$friends", "friends", "name", "socialNetwork",
new GraphLookupOptions().maxDepth(1).restrictSearchWithMatch(eq("hobbies", "golf")));

使用 sortByCount() 方法创建 $sortByCount 管道阶段,根据给定表达式对文档分组,然后按计数降序对这些分组排序。

提示

$sortByCount 阶段与带有 $sum 累加器的 $group 阶段相同,之后是 $sort 阶段。

[
{ "$group": { "_id": <expression to group on>, "count": { "$sum": 1 } } },
{ "$sort": { "count": -1 } }
]

下面的示例按照字段 x 的截断值对文档进行分组,并计算每个不同值的计数:

sortByCount(new Document("$floor", "$x"));

使用 replaceRoot() 方法创建 $replaceRoot 管道阶段,用指定文档替换每个输入文档。

以下示例将每个输入文档替换为 spanish_translation 字段中的嵌套文档:

replaceRoot("$spanish_translation");

使用 addFields() 方法创建 $addFields 管道阶段,为文档添加新字段。

提示

不想对字段包含或排除进行投影时,请使用 $addFields

以下示例会将两个新字段 ab 添加到输入文档:

addFields(new Field("a", 1), new Field("b", 2));

使用count() 方法创建 $count管道阶段,用于计算进入该阶段的文档数量,并将该值分配给指定的字段名称。如果未指定字段名称, 将默认字段名称为“计数”。count()

提示

$count 阶段相当于以下mongosh阶段:

{ "$group":{ "_id": 0, "count": { "$sum" : 1 } } }

以下示例创建了管道阶段,在名为“总计”的字段中输出传入文档的计数:

count("total");

使用 bucket() 方法创建 $bucket 管道阶段,该阶段可自动围绕预定义的边界值对数据进行分组。

以下示例创建了一个管道阶段,该阶段根据传入文档的 screenSize 字段的值(包括下边界但不包括上边界)对传入文档进行分组。

bucket("$screenSize", asList(0, 24, 32, 50, 70, 200));

使用 BucketOptions 类为指定边界之外的值指定默认存储桶,并指定其他累加器。

以下示例创建了一个管道阶段,根据传入文档的 screenSize 字段的值对传入文档进行分组,计算每个存储桶中的文档数量,将 screenSize 的值推送到名为 matches 的字段,并将任何大于“70”的屏幕尺寸捕获到一个名为“monster”的存储桶中,用于超大屏幕尺寸:

提示

驱动程序包含“累加器”类,该类为每个支持的累加器提供静态工厂方法。

bucket("$screenSize", asList(0, 24, 32, 50, 70),
new BucketOptions().defaultBucket("monster").output(sum("count", 1), push("matches", "$screenSize")));

使用 bucketAuto() 方法创建 $bucketAuto 管道阶段,自动确定每个存储桶的边界,试图将文档平均分配到指定数量的存储桶中。

以下示例创建了一个管道阶段,该阶段将尝试使用文档的 price字段的值创建文档并将其均匀分发到 10 存储桶中:

bucketAuto("$price", 10);

使用 BucketAutoOptions 类指定基于数字的首选方案来设置边界值,然后指定额外的累加器。

以下示例创建了一个管道阶段,该阶段将尝试使用文档的 price字段的值创建文档并将其均匀分发到 10 存储桶中,并将存储桶边界设置为 2 次方(2、4、 8、16、...)。它还计算每个存储桶中的文档数量,并在名为 avgPrice 的新字段中计算它们的平均值 price

提示

驱动程序包含“累加器”类,该类为每个支持的累加器提供静态工厂方法。

bucketAuto("$price", 10, new BucketAutoOptions().granularity(BucketGranularity.POWERSOF2)
.output(sum("count", 1), avg("avgPrice", "$price")));

使用 facet() 方法创建 $facet 管道阶段,允许定义并行管道。

以下示例创建了一个管道阶段,用于执行两个并行聚合:

  • 第一个聚合将传入的文档根据其 attributes.screen_size 字段分成 5 组。

  • 第二个聚合对所有制造商进行计数并返回其计数,仅限于前 5 个。

facet(new Facet("Screen Sizes",
bucketAuto("$attributes.screen_size", 5, new BucketAutoOptions().output(sum("count", 1)))),
new Facet("Manufacturer", sortByCount("$attributes.manufacturer"), limit(5)));

使用 setWindowFields() 方法创建 $setWindowFields 管道阶段,允许使用窗口操作符对集合中指定跨度的文档执行操作。

提示

窗口功能

该驱动程序包含 Windows 类,并附带用于构造窗口化计算的静态工厂方法。

以下示例创建了一个管道阶段,用于利用 rainfalltemperature 字段更精细的测量值,计算每个地点过去一个月累计的降雨量和平均气温:

Window pastMonth = Windows.timeRange(-1, MongoTimeUnit.MONTH, Windows.Bound.CURRENT);
setWindowFields("$localityId", ascending("measurementDateTime"),
WindowOutputFields.sum("monthlyRainfall", "$rainfall", pastMonth),
WindowOutputFields.avg("monthlyAvgTemp", "$temperature", pastMonth));

使用 densify() 方法创建 $densify 管道阶段,生成跨越指定时间间隔的文档序列。

提示

$densify聚合阶段需要MongoDB Server v5.1 或更高版本。

考虑从Atlas 样本天气数据集中检索到的以下文档,其中包含类似position字段的测量值,测量间隔为一小时:

Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }}
Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}

假设需要创建管道阶段,对这些文档执行以下操作:

  • 每隔 15 分钟添加一个尚未存在 ts 值的文档。

  • position 字段对文档分组。

调用 densify() 聚合阶段构建器完成这些操作的过程如下:

densify(
"ts",
DensifyRange.partitionRangeWithStep(15, MongoTimeUnit.MINUTE),
DensifyOptions.densifyOptions().partitionByFields("position.coordinates"));

以下输出突出显示聚合阶段生成的文档,其中现有文档之间每 15 分钟包含 ts 值:

Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }}
Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:15:00 EST 1984 }}
Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:30:00 EST 1984 }}
Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:45:00 EST 1984 }}
Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}

请参阅使软件包 API 文档密集化,了解更多信息。

使用fill()方法创建填充null和缺失字段值的$fill管道阶段。

提示

$fill聚合阶段需要MongoDB Server v5.3 或更高版本。

以下文档包含每小时的温度和气压测量值:

Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}}
Document{{_id=6308b..., hour=2, temperature=23.5C}}
Document{{_id=6308c..., hour=3, temperature=null, air_pressure=29.76}}

假设您需要在文档中填充缺失的温度和气压数据点,如下所示:

  • 使用线性插值填充小时“2”的 air_pressure 字段来计算值。

  • 将缺失的 temperature 值设置为“23.6C”时间为“3”小时。

调用 fill() 聚合阶段构建器完成这些操作的过程如下:

fill(
FillOptions.fillOptions().sortBy(ascending("hour")),
FillOutputField.value("temperature", "23.6C"),
FillOutputField.linear("air_pressure")
);

下面的输出突出显示文档,其中包含由聚合阶段填充的字段:

Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}}
Document{{_id=6308b..., hour=2, temperature=23.5C, air_pressure=29.75}}
Document{{_id=6308c..., hour=3, temperature=23.6C, air_pressure=29.76}}

请参阅填充软件包 API 文档,了解更多信息。

使用 search() 方法来创建 $search 管道阶段,从而指定针对一个或多个字段的全文搜索。

注意

Atlas和 Community Edition 版本要求

$search 聚合管道操作符仅适用于在运行 MongoDB v4.2 或更高版本的MongoDB Atlas集群上托管的集合,或者运行 MongoDB v8.2 或更高版本的MongoDB Community Edition集群上托管的集合。集合必须包含在MongoDB 搜索索引中。要了解有关此操作符所需设置和功能的更多信息,请参阅MongoDB 搜索文档。

以下示例创建了一个管道阶段,该阶段在 title 字段中搜索包含“Future”一词的文本:

Bson textSearch = search(
SearchOperator.text(
fieldPath("title"), "Future"));

通过搜索包API文档了解详情有关构建者的更多信息。

使用 searchMeta() 方法创建一个 $searchMeta 管道阶段,该阶段仅返回来自 MongoDB 搜索查询结果的元数据部分。

注意

Atlas和 Community Edition 版本要求

此聚合管道操作符符仅适用于运行 MongoDB v4.4.11 及更高版本的MongoDB Atlas集群,或者运行 MongoDB v8.2 或更高版本的MongoDB Community Edition集群。有关版本可用性的详细列表,请参阅 $searchMeta 上的MongoDB Atlas文档。

以下示例显示了MongoDB 搜索聚合阶段的 count 元数据:

searchMeta(
SearchOperator.near(2010, 1, fieldPath("year")));