$merge
在此页面上
$merge
将聚合管道的结果写入Atlas 集群上的临时集合。 然后, Atlas Data Federation在Atlas 集群本地运行$merge
,将数据分块合并到目标集合中。 事件$merge
操作期间发生故障,这可确保至少将部分数据写入目标集合。
在 Atlas Data Federation 中,$merge
可以:
从任何支持的联合数据库实例存储写入数据。
写入同一 Atlas 项目中的相同或不同 Atlas 集群、数据库或集合。
为了允许写入 Atlas 集群,Atlas Data Federation 为必填into
字段引入了备用语法。 Atlas Data Federation 支持所有其他字段,如$merge
中所述。
要学习;了解更多信息,请参阅$merge
管道阶段。
所需权限
Considerations
如果聚合失败, Atlas Data Federation不会回滚错误发生之前$merge
完成的任何写入。
语法
{ "$merge": { "into": { "atlas": { "projectId": "<atlas-project-ID>", "clusterName": "<atlas-cluster-name>", "db": "<atlas-database-name>", "coll": "<atlas-collection-name>" } }, "on": "<identifier field>"|[ "<identifier field1>", ...], "let": { <variables> }, "whenMatched": "replace|keepExisting|merge|fail|pipeline", "whenNotMatched": "insert|discard|fail" } }
字段
本部分介绍 Atlas Data Federation 为 into
字段提供的替代语法。
字段 | 类型 | 说明 | 必要性 |
---|---|---|---|
atlas | 对象 | 从聚合管道写入文档的位置。 | 必需 |
clusterName | 字符串 | Atlas 集群的名称。 | 必需 |
coll | 字符串 | Atlas 集群上的集合的名称。 | 必需 |
db | 字符串 | Atlas 集群上包含该集合的数据库的名称。 | 必需 |
projectId | 字符串 | 包含 Atlas 集群的项目的唯一标识符。这是包含联合数据库实例项目的 ID。如果省略,则默认为包含联合数据库实例项目的 ID。 | Optional |
要了解有关其他字段on
、 let
、 whenMatched
和whenNotMatched
的更多信息,请参阅$merge
的 MongoDB 服务器文档。
注意
要在多个字段上使用on
,必须在“on”标识符字段上创建复合唯一索引。
选项
选项 | 类型 | 说明 | 必要性 | |
---|---|---|---|---|
background | 布尔 | 标记在后台运行聚合操作。如果省略,则默认值为
如果您想要提交其他新查询,而无需等待当前运行的查询完成或断开与联合数据库实例的连接,同时查询继续在后台运行,请使用此选项。 | Optional |
解析重复的文档 ID
将文档从存档或数据存储写入 Atlas 集群时,这些文档可能具有重复的 _id
字段。本部分介绍 Atlas Data Federation 如何解决重复项,并包括解决聚合管道中的重复项的建议。
解决 Atlas Data Federation 中的重复 ID
为了解决重复项,Atlas Data Federation:
按照接收文档的顺序将文档写入 Atlas 集合
X
,直到遇到重复项。将具有重复
_id
字段的文档和所有后续文档写入新的 Atlas 集合Y
。运行指定的
$merge
阶段,将集合Y
合并到集合X
中。将生成的文档写入指定 Atlas 集群上的目标集合。
注意
Atlas Data Federation 仅解决 _id
字段中的重复值。它不会解析具有唯一索引的其他字段中的重复值。
修复重复的 ID
要修复重复的 _id
字段,可以:
在管道中包含一个
$sort
阶段,以指定 Atlas Data Federation 处理生成文档的顺序。根据流入
$merge
阶段的文档顺序,仔细选择$merge
阶段的whenMatched
和whenNotMatched
选项的值。例子
以下示例显示当
whenMatched
选项设置为keepExisting
或replace
时,Atlas Data Federation 如何在$merge
阶段解决重复项。这些示例使用以下文档:{ "_id" : 1, "state" : "FL" }, { "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } 假设您对上面列出的文档运行以下管道:
db.s3coll.aggregate([ { "$sort": { "_id": 1, "state": 1, } }, { "$merge": { "into": { "atlas": { "clusterName": "clustername", "db": "clusterdb", "coll": "clustercoll" } }, "on": "_id", "whenMatched": "keepExisting", "whenNotMatched": "insert" } } ]) Atlas Data Federation 将以下数据写入名为
X
和Y
的两个集合:{ "_id" : 1, "state" : "FL" } { "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } 然后,Atlas Data Federation 将文档从集合
Y
合并到集合X
中。对于管道中的whenMatched: keepExisting
选项,Atlas Data Federation 会保留集合X
中包含_id: 1
的现有文档。因此,具有重复项的管道的结果包含以下文档:{ "_id" : 1, "state" : "FL" }, { "_id" : 2, "state" : "TX" } 然后,Atlas Data Federation 将这些文档合并到指定 Atlas 集群上的目标集合中。
假设您对上面列出的文档运行以下管道:
db.s3coll.aggregate([ { "$sort": { "_id": 1, "state": 1, } }, { "$merge": { "into": { "atlas": { "clusterName": "clustername", "db": "clusterdb", "coll": "clustercoll" } }, "on": "_id", "whenMatched": "replace", "whenNotMatched": "insert" } } ]) Atlas Data Federation 将以下数据写入名为
X
和Y
的两个集合:{ "_id" : 1, "state" : "FL" } { "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } Atlas Data Federation 将集合
Y
中的文档合并到集合X
中。对于管道中的whenMatched: replace
选项,Atlas Data Federation 会将集合X
中带有_id: 1
的文档替换为集合Y
中带有_id: 1
的文档。因此,具有重复项的管道的结果包含以下文档:{ "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } 然后,Atlas Data Federation 将这些文档合并到指定 Atlas 集群上的目标集合中。
避免使用
whenNotMatched: discard
选项。例子
此示例展示当
whenNotMatched
选项设置为discard
时,Atlas Data Federation 如何使用以下文档解决重复项:{ "_id" : 1, "state" : "AZ" }, { "_id" : 1, "state" : "CA" }, { "_id" : 2, "state" : "NJ" }, { "_id" : 3, "state" : "NY" }, { "_id" : 4, "state" : "TX" } 假设您对上面列出的文档运行以下管道:
db.archivecoll.aggregate([ { "$sort": { "_id": 1, "state": 1, } }, { "$merge": { "into": { "atlas": { "clusterName": "clustername", "db": "clusterdb", "coll": "clustercoll" } }, "on": "_id", "whenMatched": "replace", "whenNotMatched": "discard" } } ]) Atlas Data Federation 将以下数据写入名为
X
和Y
的两个集合:{ "_id" : 1, "state" : "AZ" // gets replaced } { "_id" : 1, "state" : "CA" } { "_id" : 2, "state" : "NJ" // gets discarded } { "_id" : 3, "state" : "NY" // gets discarded } { "_id" : 4, "state" : "TX" // gets discarded } Atlas Data Federation 将集合
Y
中的文档合并到集合X
中。对于管道中的whenMatched: replace
选项,Atlas Data Federation 会将集合X
中带有_id: 1
的文档替换为集合Y
中带有_id: 1
的文档。对于管道中的whenNotMatched: discard
选项,Atlas Data Federation 会丢弃集合Y
中与集合X
中的文档不匹配的文档。因此,具有重复项的管道的结果仅包含以下文档:{ "_id" : 1, "state" : "CA" } 然后,Atlas Data Federation 会将此文档合并到指定 Atlas 集群上的目标集合中。
例子
使用以下方法合并数据: $merge
以下示例$merge
语法将结果写入Atlas 集群上名为myTestCluster
的sampleDB.mySampleData
集合。 该示例未指定项目ID ; $merge
阶段使用包含联合数据库实例的项目的ID 。
例子
1 db.mySampleData.aggregate( 2 [ 3 { 4 "$merge": { 5 "into": { 6 "atlas": { 7 "clusterName": "myTestCluster", 8 "db": "sampleDB", 9 "coll": "mySampleData" 10 } 11 }, 12 ... 13 } 14 } 15 ] 16 )
$merge
在背景运行
以下示例$merge
语法会将结果写入背景中名为myTestCluster
的Atlas 集群上的sampleDB.mySampleData
集合。 该示例未指定项目ID ; $merge
阶段使用包含联合数据库实例的项目的ID 。
例子
1 db.mySampleData.aggregate( 2 [ 3 { 4 "$merge": { 5 "into": { 6 "atlas": { 7 "clusterName": "myTestCluster", 8 "db": "sampleDB", 9 "coll": "mySampleData" 10 } 11 }, 12 ... 13 } 14 } 15 ], 16 { "background" : true } 17 )