定义
注意
本页介绍了$merge 阶段,该阶段将聚合管道结果输出到集合。有关将多个文档合并为单个文档的$mergeObjects 操作符,请参阅$mergeObjects 。
$merge将聚合管道的结果写入指定的集合。
$merge操作符必须是管道的最后一个阶段。$merge阶段:可以输出到相同或不同数据库中的集合。
可以输出到正在聚合的同一集合。有关详细信息,请参阅 输出到正在聚合的同一集合。
在 聚合管道使用
$merge$out阶段时,请考虑以下几点:从MongoDB 5.0 开始,如果集群中的所有节点都将 featureCompatibilityVersion 设置为
5.0或更高,且读取偏好允许读取从节点,那么具有$merge阶段的管道就可以在副本集从节点上运行。在早期的 MongoDB 版本中,具有
$out或$merge阶段的管道始终在主节点上运行,并且不考虑读取偏好。
如果输出集合不存在,则创建一个新集合。
可将结果(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)并入现有集合。
可输出到分片集合。输入集合也可以是分片的。
有关与同样将聚合结果输出到集合中的
$out阶段的比较,请参阅$merge和$out比较。
兼容性
可以使用 $merge 查找托管在以下环境中的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
语法
$merge 通过以下语法实现:
{ $merge: { into: <collection> -or- { db: <db>, coll: <collection> }, on: <identifier field> -or- [ <identifier field1>, ...], // Optional let: <variables>, // Optional whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional whenNotMatched: <insert|discard|fail> // Optional } }
例如:
{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
如果使用 $merge 的所有默认选项(包括写入同一数据库中的集合),则可以使用简化形式:
{ $merge: <collection> } // Output collection is in the same database
$merge 阶段采用包含以下字段的文档:
字段 | 说明 | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
可选。 充当文档唯一标识符的一个或多个字段。 该标识符确定结果文档是否与输出集合中的现有文档匹配。指定以下任一项:
对于指定的一个或多个字段:
on 的默认值取决于输出集合: | |||||||||||
可选。如果结果文档与集合中的现有文档具有相同的指定字段值,则 您可以指定以下任一项:
| |||||||||||
可选。 指定在 whenMatched管道中使用的变量。 指定包含变量名和值表达式的文档: 如果未指定,默认为 要访问权限whenMatched管道中的变量,请执行以下操作: 以 | |||||||||||
Considerations
_id 字段生成
如果聚合管道结果中的文档中不存在 _id字段,则 $merge 阶段会自动生成该字段。
示例,在以下聚合管道中,$project _id从传递给 的文档中排除$merge 字段。当$merge 将这些文档写入"newCollection" 时,$merge 会生成新的_id 字段和值。
db.movies.aggregate( [ { $project: { _id: 0 } }, { $merge : { into : "newCollection" } } ] )
如果输出集合不存在,则创建一个新集合
如果指定的输出集合不存在,则 $merge 操作会创建一个新集合。
输出集合是在
$merge将第一个文档写入集合时创建的,并且立即可见。如果聚合失败,则在错误发生之前由
$merge完成的任何写入都不会回滚。
如果输出集合不存在,则$merge 要求 on 标识符为_id 字段。要为不存在的集合使用不同的on 字段值,可以先在所需字段上创建唯一索引来创建集合。示例,如果输出集合newDailyCommentCount 不存在,而您想将commentDate 字段指定为 on 标识符:
db.newDailyCommentCount.createIndex( { commentDate: 1 }, { unique: true } ) db.comments.aggregate( [ { $match: { date: { $gte: new Date("2002-01-01"), $lt: new Date("2002-02-01") } } }, { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, count: { $sum: 1 } } }, { $project: { _id: 0, commentDate: { $toDate: "$_id" }, count: 1 } }, { $merge : { into : "newDailyCommentCount", on: "commentDate" } } ] )
输出到分片集合
$merge阶段可以输出到分片的集合。当输出集合为分片的,$merge _id会使用 字段和所有分片键字段作为默认的标识符。如果覆盖默认,则 on 标识符必须包含所有分片键字段:
{ $merge: { into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" }, on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields let: <variables>, // Optional whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional whenNotMatched: <insert|discard|fail> // Optional } }
例如,使用 sh.shardCollection() 方法创建新的分片集合 moviesByYearAndRating,其中 rated 字段作为分片键。
sh.shardCollection( "sample_mflix.moviesByYearAndRating", // Namespace of the collection to shard { rated: 1 }, // Shard key );
moviesByYearAndRating集合将包含按年份(year 字段)和内容分级(分片键)列出的电影统计信息文档;具体来说,on 标识符为["year", "rated"] (字段的顺序无关紧要)。由于$merge 1需要唯一索引,其中的键与标识符字段相对应,因此请创建唯一索引(字段的顺序无关紧要):[]
db.moviesByYearAndRating.createIndex( { rated: 1, year: 1 }, { unique: true } )
通过分片的集合moviesByYearAndRating 和创建的唯一索引后,您可以使用 $merge 将聚合结果输出到此集合,匹配 [ "year", "rated" ],如下示例所示:
db.movies.aggregate( [ { $match: { rated: { $ne: null }, year: { $ne: null } } }, { $group: { _id: { year: "$year", rated: "$rated" }, movieCount: { $sum: 1 } } }, { $project: { _id: 0, year: "$_id.year", rated: "$_id.rated", movieCount: 1 } }, { $merge: { into: "moviesByYearAndRating", "on": [ "year", "rated" ], whenMatched: "replace", whenNotMatched: "insert" } } ] )
| [1] | 在传递 { unique: true
} 选项时,sh.shardCollection() 方法还可以在分片键上创建唯一索引,前提是:分片键基于范围,集合为空,并且分片键上的唯一索引尚不存在。在前面的示例中,由于 on 标识符是分片键和另一个字段,因此需要单独的操作来创建相应的索引。 |
替换文档 ($merge) 与替换集合 ($out)
$merge 如果聚合结果包含根据...规范匹配的一个或多个文档,则可以替换输出集合中的现有文档。因此,如果聚合结果包括集合中所有现有文档的匹配文档并且您为$merge whenMatched 指定“replace”,则 可以替换现有集合中的所有文档。
但是,在不考虑聚合结果的情况下,如果要替换现有集合,请使用 $out。
现有文档以及 _id 和分片键值
如果 $merge 导致现有文档的 _id 值发生更改,则 $merge 出错。
唯一索引约束
如果$merge 在字段上使用的唯一索引在聚合过程中被删除,则无法保证聚合会被终止。如果继续聚合,则无法保证文档中没有重复的on 字段值。
如果 $merge 尝试写入的文档违反了输出集合上的任何唯一索引,则该操作会生成错误。示例:
模式验证
如果您的集合使用模式验证并将 validationAction 设置为 error,则插入无效文档或使用 $merge 更新具有无效值的文档会抛出 MongoServerError,并且该文档不会写入目标集合。如果有多个无效文档,则只有出现的第一个无效文档会引发错误。所有有效文档都写入目标集合,所有无效文档都会写入失败。
whenMatched 管道行为
如果 $merge 阶段的以下所有条件均成立,则 $merge 会将文档直接插入到输出集合中:
whenMatched 的值是一个聚合管道,
whenNotMatched 的值为
insert,并且输出集合中没有匹配的文档,
$merge 将文档直接插入到输出集合中。
$merge 和 $out 比较
通过引入$merge , MongoDB提供了两个阶段$merge 和$out ,用于聚合管道的结果写入集合:
$merge | |
|---|---|
|
|
|
|
|
|
|
|
|
|
输出到正在聚合的同一集合
警告
当 $merge 输出到正在聚合的同一集合时,文档可能会被多次更新,或者操作可能会导致无限循环。当$merge 执行的更新更改了磁盘上存储的文档的物理位置时,就会出现此行为。当文档的物理位置发生变化时,$merge 可能会将其视为全新文档,从而导致更多更新。有关此行为的更多信息,请参阅万圣节问题。
限制
限制 | 说明 |
|---|---|
聚合管道不能在ACID 事务内使用 | |
聚合管道不能使用 | |
Separate from materialized view | 视图定义不能包含 |
| |
| |
|
|
|
|
示例
本页上的示例使用sample_mflix示例数据集中的数据。有关如何将此数据集加载到自管理MongoDB 部署中的详细信息,请参阅加载示例数据集。如果对示例数据库进行了任何修改,则可能需要删除并重新创建数据库才能运行本页上的示例。
按需物化视图:初始创建
如果输出集合不存在,$merge 则会创建集合。
注意
对于副本集或独立运行部署,如果输出数据库不存在,$merge 也会创建该数据库。
对于分片集群部署,指定输出数据库必须已经存在。
您可以使用$group 和$merge 阶段创建movieRatingSummary 集合,按发布年份和内容分级汇总广受好评的电影:
db.movies.aggregate( [ { $match: { metacritic: 100, rated: { $ne: null }, year: { $lte: 1972 } } }, { $group: { _id: { year: "$year", rated: "$rated" }, count: { $sum: 1 } } }, { $merge : { into: "movieRatingSummary", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
该管道使用以下阶段:
$match阶段,过滤通过 1972 上映且带有内容分级的广受好评的电影$group阶段按year和 对影片进行群组rated$merge阶段,用于将前一个 阶段的输出写入$groupmovieRatingSummarysample_mflix数据库中的 集合
要查看新 movieRatingSummary 集合中的文档:
db.movieRatingSummary.find().sort( { _id: 1 } )
[ { _id: { year: 1939, rated: 'PASSED' }, count: 1 }, { _id: { year: 1962, rated: 'PG' }, count: 1 }, { _id: { year: 1963, rated: 'PG' }, count: 1 }, { _id: { year: 1970, rated: 'R' }, count: 1 }, { _id: { year: 1972, rated: 'R' }, count: 1 } ]
提示
按需物化视图:更新/替换数据
要更新上一示例中的 movieRatingSummary集合以包含 1963 及之后广受好评的电影,此聚合管道使用以下阶段:
$match阶段查找所有metacritic: 100、内容分级和发布年份大于或等于1963的电影。$group阶段按year和 对影片进行群组。rated$merge将结果设立写入movieRatingSummary集合,同时替换具有相同_id值的文档。对于集合中没有匹配项的文档,$merge会插入新文档。
db.movies.aggregate( [ { $match: { metacritic: 100, rated: { $ne: null }, year: { $gte: 1963 } } }, { $group: { _id: { year: "$year", rated: "$rated" }, count: { $sum: 1 } } }, { $merge : { into: "movieRatingSummary", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
聚合运行后,查看 movieRatingSummary集合中的文档:
db.movieRatingSummary.find().sort( { _id: 1 } )
[ { _id: { year: 1939, rated: 'PASSED' }, count: 1 }, { _id: { year: 1962, rated: 'PG' }, count: 1 }, { _id: { year: 1963, rated: 'PG' }, count: 1 }, { _id: { year: 1970, rated: 'R' }, count: 1 }, { _id: { year: 1972, rated: 'R' }, count: 1 }, { _id: { year: 1982, rated: 'R' }, count: 1 }, { _id: { year: 2014, rated: 'R' }, count: 1 } ]
提示
仅插入新数据
要确保$merge 不会覆盖集合中的现有数据,请将 whenMatched设立为 keepExisting 或 fail。
sample_mflix数据库中的集合movieArchive 包含每个发布年份的广受好评的电影的历史记录。
movieArchive集合在 year字段上具有唯一索引。每个发布年至多应有一条记录:
db.movieArchive.createIndex( { year: 1 }, { unique: true } )
此聚合管道使用 movies集合中的数据更新 movieArchive集合,以包括从 1963 开始广受好评的电影。该管道使用以下阶段:
$match阶段查找所有具有metacritic: 100、内容分级和 的电影。year >= 1963$group阶段按 对电影标题进行群组。year$project阶段抑制_id字段并将year提升为顶级字段。当文档传递到$merge时,$merge会自动为文档生成新的_id字段。$merge将结果设立写入movieArchive。
db.movies.aggregate( [ { $match: { metacritic: 100, rated: { $ne: null }, year: { $gte: 1963 } } }, { $group: { _id: "$year", titles: { $push: "$title" } } }, { $project: { _id: 0, year: "$_id", titles: 1 } }, { $merge : { into: "movieArchive", on: "year", whenMatched: "fail" } } ] )
如果movieArchive 集合已包含1963 –2014 范围内任何年份的文档,则聚合会因重复键错误而失败。但是,管道不会回滚在错误发生之前插入的任何文档。
如果为匹配文档指定 keepExisting,则聚合不会影响匹配文档,也不会出现重复键错误。同样,如果指定替换,操作不会失败;但是,该操作会替换现有文档。
合并多个集合的结果
默认下,如果聚合结果中的文档与集合中的文档匹配,则$merge 阶段会合并这些文档。
您可以使用 $merge 合并来自 movies集合和 comments集合的结果,以创建新的集合yearlyStats。
要创建 yearlyStats集合,运行以下管道:
db.movies.aggregate( [ { $match: { metacritic: 100, rated: { $ne: null }, year: { $gte: 1970, $lte: 1972 } } }, { $group: { _id: "$year", movieCount: { $sum: 1 } } }, { $merge : { into: "yearlyStats", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } } ])
- 第一个阶段:
$matchmetacritic: 100阶段筛选内容分级在1970 和 之间上映的广受好评的电影 ()。1972- 第二阶段:
$group阶段按year分组并将电影计入新的movieCount字段。- 第三阶段:
$merge阶段将文档写入同一数据库中的yearlyStats集合。如果该阶段在集合中找到与_id字段匹配的现有文档,则该阶段会合并匹配的文档。否则,该阶段将插入文档。对于初始创建,没有匹配的文档。
要查看集合中的文档,运行以下操作:
db.yearlyStats.find().sort( { _id: 1 } )
[ { _id: 1970, movieCount: 1 }, { _id: 1972, movieCount: 1 } ]
同样,针对 comments集合运行以下聚合管道,将评论计数合并到 yearlyStats集合中。
db.comments.aggregate( [ { $match: { date: { $gte: new Date("1970-01-01"), $lt: new Date("1973-01-01") } } }, { $group: { _id: { $year: "$date" }, commentCount: { $sum: 1 } } }, { $merge : { into: "yearlyStats", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } } ])
- 第一个阶段:
$match阶段筛选1970 和1972 之间发布的评论。- 第二阶段:
$groupdatecommentCount阶段按从评论 中提取的年份进行分组,并将评论计数到新的 字段中。- 第三阶段:
$merge阶段将文档写入同一数据库中的yearlyStats集合。如果该阶段在集合中找到与_id字段(年份)匹配的现有文档,则该阶段会合并匹配的文档。否则,该阶段将插入文档。
要在合并数据后查看 yearlyStats 集合中的文档,请运行以下操作:
db.yearlyStats.find().sort( { _id: 1 } )
[ { _id: 1970, movieCount: 1, commentCount: 889 }, { _id: 1971, commentCount: 825 }, { _id: 1972, movieCount: 1, commentCount: 863 } ]
使用管道自定义合并
$merge当文档匹配时, 可以使用自定义更新管道。 whenMatched管道可以包含以下阶段:
$addFields及其别名$set$replaceRoot及其别名$replaceWith
集合monthlyCommentTotals 跟踪每月评论的运行计数。
每天,sample_mflix.comments集合中都会有新评论。以下聚合管道使用当天的评论计数更新每月总数:
db.comments.aggregate([ { $match: { date: { $gte: new Date("1970-01-15"), $lt: new Date("1970-01-16") } } }, { $group: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, count: { $sum: 1 } } }, { $merge: { into: "monthlyCommentTotals", on: "_id", whenMatched: [ { $addFields: { count: { $add: [ "$count", "$$new.count" ] } } } ], whenNotMatched: "insert" } } ])
- 第一个阶段:
$match阶段会查找在 1 月15 发布的所有评论。1970- 第二阶段:
$group阶段按年月对匹配评论进行分组并对其进行计数。- 第三阶段:
$mergemonthlyCommentTotals阶段会将文档写入 集合。如果该阶段在集合中找到与_id字段匹配的现有文档,则该阶段将使用管道将该天的count添加到现有的月度总计中。此管道无法直接访问权限结果文档中的字段。为了访问权限结果文档中的
count字段,管道使用$$new变量;即$$new.count。此管道可直接访问权限集合中现有文档的
count字段;即$count。
生成的文档将替换现有文档。
要在合并操作后查看 monthlyCommentTotals 集合中的文档,请运行以下操作:
db.monthlyCommentTotals.find()
[ { _id: '1970-01', count: 71 } ]
使用变量自定义合并
您可以在$merge 阶段使用 whenMatched字段变量。必须先定义变量,然后才能使用。
使用以下一种或两种方式定义变量:
要在 whenMatched 中使用变量,请执行以下操作:
以 $$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month。
下面的标签页展示在合并阶段、聚合命令或两者中定义变量时的行为。
使用在合并阶段定义的变量
您可以在$merge 阶段 let 中定义变量,并在 whenMatched字段中使用这些变量。
如下示例:
使用
movies集合中的电影作为movieDetails集合的种子运行
aggregateyear命令,该命令在$mergelet 中定义一个 变量,并使用movieDetailswhenMatched 将年份添加到retrieves the
movieDetailsdocument
db.movies.aggregate( [ { $match: { title: "The Godfather" } }, { $limit: 1 }, { $project: { title: 1 } }, { $merge: { into: "movieDetails", whenNotMatched: "insert" } } ] ) db.runCommand( { aggregate: db.movieDetails.getName(), pipeline: [ { $merge: { into: db.movieDetails.getName(), let : { year: "2023" }, whenMatched: [ { $addFields: { "addedYear": "$$year" } } ] } } ], cursor: {} } ) db.movieDetails.find()
[ { _id: ..., title: 'The Godfather', addedYear: '2023' } ]
使用聚合命令中定义的变量
版本 5.0 中的新增功能。
您可以在aggregate 命令 let 中定义变量,并在$merge 阶段 whenMatched字段中使用这些变量。
如下示例:
使用
movies集合中的电影作为movieDetails集合的种子运行
aggregate命令,该命令会在aggregate命令 let 中定义year变量,并使用 whenMatched 将年份添加到movieDetails中retrieves the
movieDetailsdocument
db.movies.aggregate( [ { $match: { title: "The Godfather" } }, { $limit: 1 }, { $project: { title: 1 } }, { $merge: { into: "movieDetails", whenNotMatched: "insert" } } ] ) db.runCommand( { aggregate: db.movieDetails.getName(), pipeline: [ { $merge: { into: db.movieDetails.getName(), whenMatched: [ { $addFields: { "addedYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2023" } } ) db.movieDetails.find()
[ { _id: ..., title: 'The Godfather', addedYear: '2023' } ]
使用在合并阶段和聚合命令中定义的变量
您可以在$merge 阶段以及(从MongoDB5.0 开始)aggregate 命令中定义变量。
如果在$merge 阶段和aggregate 命令中定义了两个同名的变量,则使用$merge 阶段变量。
在此示例中,管道使用year: "2023" 而不是year: "2019"aggregate 命令变量:
db.movies.aggregate( [ { $match: { title: "The Godfather" } }, { $limit: 1 }, { $project: { title: 1 } }, { $merge: { into: "movieDetails", whenNotMatched: "insert" } } ] ) db.runCommand( { aggregate: db.movieDetails.getName(), pipeline: [ { $merge: { into: db.movieDetails.getName(), let : { year: "2023" }, whenMatched: [ { $addFields: { "addedYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2019" } } ) db.movieDetails.find()
[ { _id: ..., title: 'The Godfather', addedYear: '2023' } ]
以下 Movie 类对此示例中使用的文档进行建模:
[] public class Movie { [] public ObjectId Id { get; set; } [] public string Title { get; set; } = null!; [] public int? Year { get; set; } [] public int? Runtime { get; set; } [] public string? Rated { get; set; } [] public int Metacritic { get; set; } [] public string? Plot { get; set; } [] public string? Type { get; set; } [] public string[]? Cast { get; set; } [] public string[]? Directors { get; set; } [] public string[]? Writers { get; set; } [] public ImdbData? Imdb { get; set; } }
要使用MongoDB .NET/C# 驱动程序将 $merge 阶段添加到聚合管道,请对 PipelineDefinition 对象调用 Merge() 方法。
调用 Merge() 方法时,必须传递 MergeStageOptions 类的实例。通过该对象,您可以指定 $merge 阶段的选项,例如如何处理匹配文档。
以下示例创建了一个管道阶段,用于将管道中的文档合并到 movies集合中。MergeStageOptions对象指定以下选项:
OnFieldNames选项指定操作应使用"_id"字段来识别匹配的文档。WhenMatched选项指定,如果源集合中的文档与目标集合中的文档匹配,则操作将替换匹配的文档。WhenNotMatched选项指定,如果源集合中的文档与目标集合中的文档不匹配,则该操作将该文档插入到目标集合中。
var pipeline = new EmptyPipelineDefinition<Movie>() .Merge(_targetCollection, new MergeStageOptions<Movie>() { OnFieldNames = new List<string>() { "_id" }, WhenMatched = MergeStageWhenMatched.Replace, WhenNotMatched = MergeStageWhenNotMatched.Insert, });
{ "_id": "...", "title": "Back to the Future", "metacritic": 96 } { "_id": "...", "title": "Jurassic Park", "metacritic": 68 } { "_id": "...", "title": "The Shawshank Redemption", "metacritic": 80 }
本页上的 Node.js 示例使用 Atlas 示例数据集中的 sample_mflix数据库。要学习如何创建免费的MongoDB Atlas 集群并加载示例数据集,请参阅MongoDB Node.js驱动程序文档中的入门。
要使用MongoDB Node.js驱动程序将 $merge 阶段添加到聚合管道,请在管道对象中使用 $merge操作符。
以下示例创建了一个管道阶段,用于将管道中的文档合并到 movies集合中。该示例包括以下字段:
on选项指定操作应使用"_id"和"title"字段在源集合和movies集合中查找匹配文档。whenMatched选项指定,如果源集合中的文档与movies集合中的文档匹配,则替换movies集合中的文档。whenNotMatched选项指定,如果源集合中的文档与movies集合中的文档不匹配,则操作将该文档插入到movies集合中。
然后,该示例运行聚合管道:
const pipeline = [ { $merge: { into: "movies", on: ["_id", "title"], whenMatched: "replace", whenNotMatched: "insert" } } ]; const cursor = collection.aggregate(pipeline); return cursor;