定义
$merge 阶段在 Connection Registry 中指定要将消息写入到的连接。连接必须是Atlas连接。
$merge 管道阶段采用以下原型形式:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
语法
Atlas Stream Processing 版本的 $merge 使用与 Atlas Data Federation 版本相同的大部分字段。Atlas Stream Processing 还使用以下字段,这些字段要么是其实现 $merge 所独有的,要么经过修改以适应其实现。要进一步了解与 Atlas Data Federation 共享的字段 $merge,请参阅 $merge 语法。
字段 | 必要性 | 说明 |
|---|---|---|
| 必需 | 简化以反映仅支持 要了解更多信息,请参阅对 Atlas Data Federation |
| Optional | 与 Atlas Data Federation 当设置为 如果您使用动态表达式值,它必须解析为以下字符串之一:
|
| Optional | 与 Atlas Data Federation 如果您使用动态表达式值,它必须解析为以下字符串之一:
|
| 可选的 | 用于分配写入操作的线程数量。必须是介于 如果您为 |
行为
限制
$merge 必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$merge阶段。
针对分片集合,on 字段对 $merge 有特殊要求。要了解更多信息,请参阅 $merge 语法。
如果您为 into.coll 或 into.db 使用动态表达式值,则不能将 parallelism 的值设置为大于 1。
$merge 无法写入时间序列集合。要将文档写入时间序列集合,请使用 $emit 阶段。
您需要具有Atlas管理员角色才能在分片的集合上使用 $merge。
动态表达式
您可以使用动态表达式作为以下字段的值:
into.dbinto.coll
这使您的流处理器能够逐条消息将消息写入不同的目标 Atlas collection。
例子
您有一个事务事件流,它会生成以下形式的消息:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
要将其中每一个分类到不同的 Atlas 数据库和collection中,您可以编写以下$merge阶段:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
此$merge阶段:
将
Very Important Industries消息写入名为VIP.subscription的 Atlas collection。将
N. E. Buddy消息写入名为employee.requisition的 Atlas collection。将
Khan Traktor消息写入名为contractor.billableHours的 Atlas collection。
您只能使用计算结果为字符串的动态表达式。 有关动态表达式的更多信息,请参阅表达式操作符。
如果您使用动态表达式指定数据库或集合,但是 Atlas Stream Processing 无法计算给定消息的表达式,Atlas Stream Processing 会将该消息发送到死信队列(如果已配置)并处理后续消息。如果未配置死信队列,则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。
保存来自 Kafka 主题的数据
要将来自多个 Apache Kafka 主题的流数据保存到您的 Atlas 集群中的集合,请使用 $merge 阶段和 $source 阶段。$source 阶段指定了从中读取数据的主题。$merge 阶段会将此数据写入目标集合。
使用以下语法:
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
示例
基本示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source阶段与 Apache Kafka 代理建立连接,在名为my_weatherdata的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime。$match阶段会排除dewPoint.value小于或等于5.0的文档,并将dewPoint.value大于5.0的文档传递到下一个阶段。$merge阶段将输出写入sample_weatherstream数据库中名为stream的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。
复制变更流事件
您可以使用 $merge.whenMatched 和 $merge.whenNotMatched 参数,根据变更流事件的操作类型来复制其影响。
以下聚合有四个阶段:
$source阶段通过atlas1连接在 Atlas 集群上建立与db1.coll1集合的连接。在
$addFields阶段,每个文档的"$operationType值与"delete"之间的相等性检查值将被设置为fullDocument._isDelete字段,从而丰富了输入文档的内容。此等式的计算结果为布尔值。$replaceRoot阶段将文档替换为增强的$fullDocument字段的值。$merge阶段通过atlas2连接写入db1.coll1,对每个文档执行两项检查:首先,
whenMatched字段检查文档是否与db1.coll1集合中通过_id匹配的现有文档匹配,这是默认的匹配字段,因为on未显式设置。如果条件成立,并且fullDocument._isDelete设置为true,那么 Atlas 会删除匹配的文档。如果匹配并且fullDocument._isDelete设置为false,则 Atlas 会用流数据源中的新文档替换匹配的文档。其次,如果 Atlas Stream Processing 没有找到匹配的文档并且
fullDocument._isDelete为 true,Atlas 会丢弃该文档,而不是将其写入集合。如果没有此类匹配文档并且fullDocument._isDelete为 false,Atlas 会将流数据源中的文档插入到集合中。
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }