Docs 菜单

Docs 主页开发应用程序MongoDB Manual

$merge(聚合)

在此页面上

  • 定义
  • 兼容性
  • 语法
  • 考虑因素
  • 限制
  • 举例

注意

本页介绍了 $merge阶段,该阶段将聚合管道结果输出到集合中。有关将多个文档合并为单个文档的$mergeObjects操作符,请参阅$mergeObjects

$merge

聚合管道的结果写入指定的集合。 $merge操作符必须是管道的最后一个阶段。

$merge阶段:

  • 可以输出到相同或不同数据库中的集合。

  • 可以输出到正在聚合的同一集合。有关详细信息,请参阅输出到正在聚合的同一集合。

  • $merge如果集群中的所有节点都将 featureCompatibilityVersion 设置为5.0 或更高,并且 读取偏好 允许从节点读取,则具有 阶段的管道可以在副本集从节点上运行。

    • $merge语句的读取操作会发送到从节点,而写入操作仅发生在主节点上。

    • 并非所有驱动程序版本都支持将$merge操作定位到副本集从节点。检查驱动程序文档,了解驱动程序何时添加了对从节点上运行的$merge读取操作的支持。

  • 如果输出集合不存在,则创建一个新集合。

  • 可将结果(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)并入现有集合。

  • 可输出到分片集合。输入集合也可以是分片的。

有关与$out阶段(也将聚合结果输出到集合)的比较,请参阅$merge$out比较。

注意

按需物化视图

$merge 可以将管道结果纳入现有的输出集合,而不是执行对集合完全替换。此功能允许用户创建按需物化视图,在管道运行时增量更新输出集合的内容。

有关此用例的更多信息,请参阅《按需物化视图》以及本页上的示例。

物化视图与只读视图是分开的。有关创建只读视图的信息,请参阅只读视图。

可以使用 $merge 查找托管在以下环境中的部署:

  • MongoDB Atlas :用于在云中部署 MongoDB 的完全托管服务

$merge 通过以下语法实现:

{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

例如:

{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

如果使用$merge的所有默认选项(包括写入同一数据库中的集合),则可以使用简化形式:

{ $merge: <collection> } // Output collection is in the same database

$merge接受包含以下字段的文档:

字段
说明
into

输出集合。指定以下任一项:

  • 将集合名称作为字符串输出到运行聚合的同一数据库中的集合。例如:

    into: "myOutput"

  • 文档中的数据库和集合名称,以输出到指定数据库中的集合。例如:

    into: { db:"myDB", coll:"myOutput" }

注意

  • 如果输出集合不存在, $merge将创建该集合:

    • 对于副本集或独立运行,如果输出数据库不存在, $merge也会创建该数据库。

    • 对于分片集群,指定的输出数据库必须已经存在。

  • 输出集合可以是分片集合。

可选。充当文档唯一标识符的一个或多个字段。标识符确定结果文档是否与输出集合中的现有文档匹配。指定以下任一项:

  • 字符串形式的单个字段名称。例如:

    on: "_id"

  • 数组中的字段组合。例如:

    on: [ "date", "customerId" ]
    数组中字段的顺序并不重要,不能多次指定同一字段。

对于指定的一个或多个字段:

  • 聚合结果文档必须包含 on 中指定的字段,除非 on 字段是 _id 字段。如果结果文档中缺少 _id 字段,MongoDB 会自动添加。

  • 指定的一个或多个字段不能包含 null 或数组值。

$merge需要一个唯一的索引,其中的键对应标识符字段。尽管索引键规范的顺序并不重要,但唯一索引必须仅包含on字段作为其键。

  • 索引必须与聚合具有相同的排序规则

  • 唯一索引可为稀疏索引。

  • 唯一索引不能为部分索引。

  • 对于已存在的输出集合,相应的索引必须已存在。

on 的默认值取决于输出集合:

  • 如果输出集合不存在,则on标识符必须是且默认为_id字段。系统会自动创建相应的唯一_id索引。

    提示

    要为不存在的集合使用不同标识符字段,可以先在所需字段上创建唯一索引来创建集合。有关示例,请参阅有关不存在的输出集合的部分

  • 如果现有输出集合未进行分片,on 标识符则默认为 _id 字段。

  • 如果现有输出集合是分片集合,则on标识符默认为所有分片键字段和_id字段。如果指定不同的on标识符,则on必须包含所有分片键字段。

可选。如果结果文档与集合中的现有文档具有相同$merge 的 指定 字段值,则 的行为。

您可以指定以下任一项:

  • 预定义的动作字符串之一:

    操作
    说明

    输出集合中的现有文档替换为匹配的结果文档。

    执行替换时,替换文档不能导致修改 _id 值,或者,如果输出集合是分片的,则不能修改分片键值。否则,操作将产生错误。

    提示

    为避免此错误,如果on字段不包含_id字段,请删除聚合结果中的_id字段以避免错误,例如在前面添加$unset阶段等。

    将现有文档保留在输出集合中。

    "merge" (默认)

    合并匹配文档(类似于 $mergeObjects 操作符)。

    • 如果结果文档包含不在现有文档中的字段,请将这些新字段添加到现有文档中。

    • 如果结果文档包含现有文档中的字段,则将现有的字段值替换为来自结果文件的值。

    例如,如果输出集合中有以下文档:

    { _id: 1, a: 1, b: 1 }

    且聚合结果中以下文档:

    { _id: 1, b: 5, z: 1 }

    则合并文档为:

    { _id: 1, a: 1, b: 5, z: 1 }

    执行合并时,合并的文档不会导致分片键值(如果输出集合已分片)或 _id 值发生修改。否则,操作将产生错误。

    提示

    为避免此错误,如果on字段不包含_id字段,请删除聚合结果中的_id字段以避免错误,例如在前面添加$unset阶段等。

    停止聚合操作并使其失败。先前文档对输出集合的任何更改都不会恢复。

  • 用于更新集合中文档的聚合管道。

    [ <stage1>, <stage2> ... ]

    该管道只能由以下阶段组成:

    管道无法修改on字段的值。例如,如果您要对字段month进行匹配,则管道无法修改month字段。

    whenMatched pipeline 可以使用 $<field> 直接访问输出集合中现有文档的字段。

    要访问聚合结果文档中的字段,请使用以下任一项:

    • 用于访问字段的内置$$new变量。具体来说,为$$new.<field> 。仅当省略let规范时, $$new变量才可用。

      注意

      从 MongoDB 4.2.2 开始,$$new 变量被保留,并且不能被覆盖。

    • let字段中的用户定义变量。

      $$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month

      有关更多示例,请参阅使用变量自定义合并。

可选。指定在whenMatched 管道中使用的变量。

指定包含变量名和值表达式的文档:

{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }

如果未指定,默认为{ new: "$$ROOT" } (请参阅ROOT )。 whenMatched 管道可以访问$$new变量。

注意

从 MongoDB 4.2.2 开始,$$new 变量被保留,并且不能被覆盖。

要访问whenMatched 管道中的变量,请执行以下操作:

$$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month

有关示例,请参阅使用变量自定义合并。

可选。如果结果文档与输出集合中的现有文档不匹配,则$merge的行为。

您可以指定以下预定义的动作字符串之一:

操作
说明
"insert" (默认)

将此文档插入输出集合。

丢弃文档。具体来说, $merge不会将文档插入到输出集合中。

停止聚合操作并使其失败。已写入输出集合的所有更改均不会恢复。

如果聚合管道结果中的文档中不存在_id字段,则$merge阶段会自动生成该字段。

例如,在以下聚合管道中, $project会从传递给$merge的文档中排除_id字段。当$merge将这些文档写入"newCollection"时, $merge会生成新的_id字段和值。

db.sales.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )

如果指定的输出集合不存在,则$merge操作会创建一个新集合。

  • 输出集合是在$merge将第一个文档写入集合时创建的,并且立即可见。

  • 如果聚合失败,则在错误发生之前由$merge完成的任何写入都不会回滚。

注意

对于副本集或独立运行,如果输出数据库不存在, $merge也会创建该数据库。

对于分片集群,指定的输出数据库必须已经存在。

如果输出集合不存在,则$merge要求on标识符为_id字段。要为不存在的集合使用不同的on字段值,可以先在所需字段上创建唯一索引来创建集合。例如,如果输出集合newDailySales201905不存在,而您想指定salesDate字段作为on标识符:

db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
db.sales.aggregate( [
{ $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
{ $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
{ $merge : { into : "newDailySales201905", on: "salesDate" } }
] )

$merge阶段可以输出到分片集合。当输出集合已分片时, $merge会使用_id字段和所有分片键字段作为标识符默认值。如果覆盖默认值,则on标识符必须包含所有分片键字段:

{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

例如,使用 sh.shardCollection() 方法创建新的分片集合 newrestaurants,其中 postcode 字段作为分片键。

sh.shardCollection(
"exampledb.newrestaurants", // Namespace of the collection to shard
{ postcode: 1 }, // Shard key
);

newrestaurants集合将包含按月份( date字段)和邮政编码(分片键)列出的新餐厅开业信息的文档;具体来说, on标识符为["date", "postcode"] (字段的顺序无关紧要)。由于$merge需要唯一索引,其中的键标识符字段相对应,因此请创建唯一索引(字段的顺序无关紧要): [ 1 ]

use exampledb
db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )

通过分片集合restaurants和创建的唯一索引后,可以使用$merge将聚合结果输出到此集合,匹配[ "date", "postcode" ] ,如下例所示:

use exampledb
db.openings.aggregate([
{ $group: {
_id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
restaurants: { $push: "$restaurantName" } } },
{ $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
{ $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
])
[1] 在传递 { unique: true } 选项时,sh.shardCollection() 方法还可以在分片键上创建唯一索引,前提是:分片键基于范围,集合为空,并且分片键上的唯一索引尚不存在。在前面的示例中,由于 on 标识符是分片键和另一个字段,因此需要单独的操作来创建相应的索引。

$merge如果聚合结果包含根据 on 规范匹配的一个或多个文档,则 可以替换输出集合中的现有文档。因此,如果聚合结果包括集合中所有现有文档的匹配文档并且您为 whenMatched 指定 “replace”$merge ,则 可以替换现有集合中的所有文档。

但是,在不考虑聚合结果的情况下,如果要替换现有集合,请使用 $out

如果$merge $merge导致现有文档的_id 值发生更改,则 出错。

提示

为避免此错误,如果on字段不包含_id字段,请删除聚合结果中的_id字段以避免错误,例如在前面添加$unset阶段等。

此外,对于分片集合,如果$merge导致现有文档的分片键值发生更改,也会生成错误。

在错误发生之前, $merge完成的任何写入都不会回滚。

如果$merge字段上使用的唯一索引在聚合过程中被删除,则不能保证聚合会被终止。如果继续聚合,则无法保证文档中没有重复的on字段值。

如果$merge尝试写入的文档违反了输出集合上的任何唯一索引,则该操作会生成错误。例如:

从 MongoDB 4开始。 2 。 2 ,如果以下所有条件对于$merge阶段为 true:

$merge 将文档直接插入到输出集合中。

MongoDB 4之前的版本。 2 。 2 ,当满足$merge阶段的这些条件时,将使用空输入文档执行whenMatched字段中指定的管道。管道中生成的文档将插入到输出集合中。

通过引入$merge ,MongoDB 提供了两个阶段$merge$out ,用于将聚合管道的结果写入集合:

  • 可以输出到相同或不同数据库中的集合。

  • 可以输出到相同或不同数据库中的集合。

  • 如果输出集合不存在,则创建一个新集合。

  • 如果输出集合不存在,则创建一个新集合。

  • 完全替换已存在的输出集合。

  • 可输出到分片集合。输入集合也可以是分片的。

  • 无法输出到分片集合。但是,可以对输入集合进行分片。

  • 对应 SQL 语句:

    • MERGE.

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

    • 创建/刷新物化视图。

  • 对应 SQL 语句:

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

警告

$merge 输出到正在聚合的同一集合时,文档可能会被多次更新,或者操作可能会导致无限循环。当$merge 执行的更新更改了磁盘上存储的文档的物理位置时,就会出现此行为。当文档的物理位置发生变化时,$merge 可能会将其视为全新文档,从而进行更多更新。有关此行为的更多信息,请参阅 万圣节问题。

$merge可以输出到正在聚合的同一集合。您还可以输出到管道其他阶段中出现的集合,例如$lookup

限制
说明
聚合管道不能在$merge 事务 内使用 。
聚合管道不能使用$merge输出到时间序列集合。
与物化视图分开
视图定义不能包含$merge阶段。如果视图定义包含嵌套管道(例如,视图定义包含$facet阶段),则此$merge阶段限制也适用于嵌套管道。
$lookup 阶段
$lookup阶段的嵌套管道不能包含$merge阶段。
$facet 阶段
$facet阶段的嵌套管道不能包含$merge阶段。
$unionWith 阶段
$unionWith阶段的嵌套管道不能包含$merge阶段。
"linearizable" 读关注 (read concern)

$merge阶段不能与读关注"linearizable" 一起使用。也就是说,如果您为"linearizable" db.collection.aggregate()指定$merge 读关注,则不能在管道中包含 阶段。

如果输出集合不存在, $merge将创建该集合。

例如,zoo 数据库中的 salaries 集合填充有员工工资和部门历史:

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
])

您可以使用$group$merge阶段,首先根据salaries集合中当前的数据创建名为budgets的集合(在reporting数据库中):

注意

对于副本集或独立运行部署,如果输出数据库不存在, $merge也会创建该数据库。

对于分片集群部署,指定输出数据库必须已经存在。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
  • $group 阶段以按 fiscal_yeardept 对这些工资进行分组。

  • $merge阶段将前一个$group阶段的输出写入reporting数据库中的budgets集合。

要查看新 budgets 集合中的文档:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

budgets 集合包含以下文档:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

提示

另请参阅:

以下示例将使用上一示例中的集合。

示例 salaries 集合包含员工工资和部门历史:

{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

示例 budgets 集合包含年度累积预算:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

在本财政年度(本例中为 2019),salaries 集合中添加了新员工,并为下一年度预分配了新员工人数:

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
{ "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
{ "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
{ "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
])

要更新 budgets 集合以反映新的工资信息,使用以下聚合管道:

  • $match 阶段,查找 fiscal_year 大于或等于 2019 的所有文档:

  • $group 阶段以按 fiscal_yeardept 对这些工资进行分组。

  • $merge将结果集写入budgets集合,同时替换具有相同_id值的文档(在本示例中,替换为包含财政年度和部门的文档)。对于集合中没有匹配项的文档, $merge会插入新文档。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match : { fiscal_year: { $gte : 2019 } } },
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

聚合运行后,查看 budgets 集合中的文档:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

budgets 集合包含 2019 财年的新薪资数据,并添加 2020 财年的新文档:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }

提示

另请参阅:

为确保$merge不会覆盖集合中的现有数据,请将whenMatched设置为keepExistingfail。

zoo 数据库中的示例 salaries 集合包含员工薪资和部门历史记录:

{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

reporting 数据库中的集合 orgArchive 包含过去财政年度的部门组织历史记录。不得修改存档记录。

{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }

orgArchive集合具有针对 fiscal_yeardept 字段的唯一复合索引。具体而言,同一财政年度与部门的组合最多应有一条记录:

db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )

在当前财经年度结束时(本例中为 2019),salaries 集合包含以下文档:

{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
{ "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
{ "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
{ "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
{ "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
{ "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
{ "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
{ "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
{ "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
{ "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
{ "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
{ "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }

要更新 orgArchive 集合以包含刚刚结束的财政年度 2019,请使用以下聚合管道:

  • $match 阶段,查找 fiscal_year 等于 2019 的所有文档。

  • $group 阶段以按 fiscal_yeardept 对这些员工进行分组。

  • $project阶段抑制_id字段并添加单独的deptfiscal_year字段。当文档传递给$merge时, $merge会自动为文档生成新的_id字段。

  • $merge将结果集写入orgArchive

    $merge阶段匹配deptfiscal_year字段以及fails的文档(匹配时)。也就是说,如果同一部门和财政年度的文档已经存在,则$merge出错。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )

操作完成后,orgArchive 集合包含以下文档:

{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }

如果orgArchive集合已包含部门"A"和/或"B"的2019文档,则聚合因重复键错误而失败。但是,在错误发生之前插入的任何文档都不会回滚。

如果为匹配文档指定keepExisting ,则聚合不会影响匹配文档,也不会出现重复键错误。同样,如果指定替换,则操作不会失败;但是,该操作将替换现有文档。

默认情况下,如果聚合结果中的文档与集合中的文档匹配,则$merge阶段会合并这些文档。

示例集合 purchaseorders 按季度和区域填充了采购订单信息:

db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )

另一示例集合 reportedsales 则包含按季度和地区报告的销售信息:

db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )

假设出于报告目的,您希望根据以下格式按季度查看数据:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

您可以使用$merge合并来自purchaseorders集合和reportedsales集合的结果,以创建新的集合quarterlyreport

要创建 quarterlyreport 集合,可以使用以下管道:

db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
第一个阶段:

$group 阶段按季度分组,并使用 $sumqty 字段添加到新的 purchased 字段中。例如:

要创建 quarterlyreport 集合,您可以使用此管道:

{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
第二阶段:
$merge阶段将文档写入同一数据库中的quarterlyreport集合。如果该阶段在集合中找到与_id字段匹配的现有文档,则该阶段会合并匹配的文档。否则,该阶段将插入文档。对于初始创建,所有文档都不应匹配。

要查看集合中的文档,运行以下操作:

db.quarterlyreport.find().sort( { _id: 1 } )

该集合包含以下文档:

{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }

同样,针对 reportedsales 集合运行以下聚合管道,将销售结果合并到 quarterlyreport 集合中。

db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
第一个阶段:

$group 阶段按季度分组,并使用 $sumqty 字段添加到新的 sales 字段中。例如:

{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
第二阶段:
$merge阶段将文档写入同一数据库中的quarterlyreport集合。如果该阶段在集合中找到_id字段(季度)匹配的现有文档,则该阶段会合并匹配的文档。否则,该阶段将插入文档。

要在合并数据后查看 quarterlyreport 集合中的文档,请运行以下操作:

db.quarterlyreport.find().sort( { _id: 1 } )

该集合包含以下文档:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

当文档匹配时, $merge可以使用自定义更新管道whenMatched 管道可以包含以下阶段:

示例集合 votes 包含每日投票记录。创建包含以下文档的集合:

db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )

另一个示例集合 monthlytotals 包含最新的每月投票总数。使用以下文档创建集合:

db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)

每天结束时,将当天的选票插入 votes 集合中:

db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)

您可以将$merge与自定义管道一起使用,以更新集合monthlytotals中的现有文档:

db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
第一个阶段:

$match 阶段会查找特定日期的投票。例如:

{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
第二阶段:

$project 阶段将 _id 字段设置为年月字符串。例如:

{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
第三阶段:

$merge阶段将文档写入同一数据库中的monthlytotals集合。如果该阶段在集合中找到与_id字段匹配的现有文档,则该阶段将使用管道添加thumbsup投票和thumbsdown投票。

  • 该管道不能直接访问结果文档中的字段。为了访问结果文档中的 thumbsup 字段和 thumbsdown 字段,该管道使用 $$new 变量;即 $$new.thumbsup$new.thumbsdown

  • 该管道可直接访问集合中现有文档内的 thumbsupthumbsdown 字段;即 $thumbsup$thumbsdown

生成的文档将替换现有文档。

要在合并操作后查看 monthlytotals 集合中的文档,请运行以下操作:

db.monthlytotals.find()

集合包含以下文档:

{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }

您可以在$merge阶段whenMatched字段使用变量。必须先定义变量,然后才能使用。

使用以下一种或两种方式定义变量:

要在whenMatched 中使用变量,请执行以下操作:

$$<variable_name> 形式指定双美元符号 ($$) 前缀以及变量名称。例如,$$year。如果将此变量设为文档,则还能以 $$<variable_name>.<field> 形式包含文档字段。例如,$$year.month

下面的标签页展示在合并阶段、聚合命令或两者中定义变量时的行为。

← $match(聚合)