定义
$source 阶段在 Connection Registry 中指定要从中流式传输数据的连接。支持以下连接类型:
Apache Kafka 代理
MongoDB collection change stream
MongoDB database change stream
文档数组
语法
Apache Kafka 代理
为了操作来自Apache Kafka代理的流媒体数据,$source 阶段具有以下原型形式:
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |
|---|---|---|---|---|
| 字符串 | 必需 | 用于标识连接注册表中要从中提取数据的连接的标签。 | |
| 字符串或字符串数组 | 必需 | 一个或多个 Apache Kafka 主题的名称,用于从这些主题流式传输信息。如果要流式传输多个主题的消息,请在数组中指定这些主题。 | |
| 文档 | Optional | 为传入消息定义权威时间戳的文档。 如果使用
如果您未声明 | |
| 文档 | Optional | 指定分区在水印计算中被忽略之前允许其空闲的时间长度的文档。 默认情况下,此字段为禁用。要处理因空闲而无进展的分区,请为此字段设置一个值。 | |
| 整型 | Optional | 指定分区空闲超时持续时间的数字。 | |
| 字符串 | Optional | 分区空闲超时持续时间的时间单位。
| |
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 | |
| 字符串 | Optional | 指定从 Apache Kafka 源主题中的哪个事件开始摄取。
默认值为 | |
| 字符串 | Optional | 与流处理器关联的 kafka 使用者群组的ID 。如果省略, Atlas Stream Processing会将流处理工作区与以下格式的自动生成的ID相关联: Atlas Stream Processing会自动为所有持久流处理器生成此参数的值。对于使用 sp 定义的临时流处理器。 进程(),仅当您手动定义该参数时才会设立该参数。 | |
| 布尔 | 可选的 | 用于确定Kafka代理分区偏移提交策略的标志。 Atlas Stream Processing支持两种提交策略:
仅当设立了 对于使用 sp | |
| 字符串 | Optional | ||
| 字符串 | Optional | 如何处理在反序列化 Apache Kafka 密钥数据时遇到的错误。必须是以下值之一:
|
注意
Atlas Stream Processing 要求源数据流中的文档在 json 或 ejson有效。Atlas Stream Processing 会将不满足此要求的文档设置为死信队列 (如果您已配置)。
MongoDB 集合变更流
Atlas 集合变更流允许应用程序访问针对单个集合的实时数据更改。要学习如何对集合打开变更流,请参阅变更流。
为了操作来自 Atlas collection 的流媒体数据, $source阶段具有以下原型形式:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "coll" : ["<source-coll>",...], "initialSync": { "enable": <boolean>, "parallelism": <integer> }, "readPreference": "<read-preference>", "readPreferenceTags": [ {"<key>": "<value>"}, . . . ] "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }], "maxAwaitTimeMS": <time-ms>, } } }
$source 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
|---|---|---|---|
| 字符串 | 可选的 | 用于标识连接注册表中要从中提取数据的连接的标签。 |
| 文档 | Optional | 为传入消息定义权威时间戳的文档。 如果使用
如果您未声明 |
| 字符串 | 必需 | 在 |
| 字符串或字符串数组 | 必需 | 在 |
| 文档 | Optional | 包含与 Atlas Stream Processing 如果启用 重要您只能在传入文档的 |
| 布尔 | 可选的 | 确定是否启用 |
| 整型 | Optional | 确定进程 |
| 文档 | Optional | |
| 文档 | Optional | 读取 |
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 |
| token | 可选的 | 源开始报告的变更事件。这采用 resume token 的形式。 您只能使用 |
| timestamp | 可选的 | 源应开始报告的操作时间。 您只能使用 接受 MongoDB 扩展 JSON |
| 字符串 | 可选的 | 用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:
如果没有为 fullDocument 指定值,则默认为 要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像。 |
| 布尔 | 可选的 | 用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回 要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像。 |
| 字符串 | Optional | 指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:
如果没有为 要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像。 |
| 文档 | Optional | 指定一个聚合管道,用于传递变更流输出进行进一步处理之前对其进行过滤。该管道必须符合修改变更流输出中描述的参数。 重要每个变更事件都包括 |
| 整型 | Optional | 在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。 默认值为 |
MongoDB database change stream
Atlas 数据库变更流允许应用程序访问针对单个数据库的实时数据更改。要学习如何针对数据库打开变更流,请参阅变更流。
为了操作来自 Atlas 数据库变更流的流数据, $source阶段具有以下原型形式:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
|---|---|---|---|
| 字符串 | 可选的 | 用于标识连接注册表中要从中提取数据的连接的标签。 |
| 文档 | Optional | 为传入消息定义权威时间戳的文档。 如果使用
如果您未声明 |
| 字符串 | 必需 | 在 |
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 |
| token | 可选的 | 源开始报告的变更事件。这采用 resume token 的形式。 您只能使用 |
| timestamp | 可选的 | 源应开始报告的操作时间。 您只能使用 接受 MongoDB 扩展 JSON |
| 字符串 | 可选的 | 用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:
如果没有为 fullDocument 指定值,则默认为 要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。 |
| 布尔 | 可选的 | 用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回 要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。 |
| 字符串 | Optional | 指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:
如果没有为 要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。 |
| 文档 | Optional | 指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。 重要每个变更事件都包括 |
| 整型 | Optional | 在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。 默认值为 |
MongoDB 集群范围变更流源
要对来自整个 Atlas 集群变更流的流数据进行操作, $source 阶段具有以下原型形式:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
|---|---|---|---|
| 字符串 | 可选的 | 用于标识连接注册表中要从中提取数据的连接的标签。 |
| 文档 | Optional | 为传入消息定义权威时间戳的文档。 如果使用
如果您未声明 |
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 |
| token | 可选的 | 源开始报告的变更事件。这采用 resume token 的形式。 您只能使用 |
| 日期 | 时间戳 | 可选的 | 源应开始报告的操作时间。 您只能使用 接受 MongoDB 扩展 JSON |
| 字符串 | 可选的 | 用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:
如果没有为 fullDocument 指定值,则默认为 要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。 |
| 布尔 | 可选的 | 用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回 要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。 |
| 字符串 | Optional | 指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:
如果没有为 要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。 |
| 文档 | Optional | 指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。 请注意, Atlas Stream Processing希望从每个摄取的变更事件中接收 |
| 整型 | Optional | 在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。 默认值为 |
文档数组
为了对文档数组进行操作, $source阶段具有以下原型形式:
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "documents" : [{source-doc},...] | <expression> } }
$source 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
|---|---|---|---|
| 文档 | Optional | 为传入消息定义权威时间戳的文档。 如果使用
如果您未声明 |
| 阵列 | 可选的 | 用作流媒体数据源的文档数组。该字段的值可以是对象数组,也可以是计算结果为对象数组的表达式。使用 |
行为
示例
Kafka 示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source阶段与 Apache Kafka 代理建立连接,在名为my_weatherdata的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime。$match阶段会排除dewPoint.value小于或等于5.0的文档,并将dewPoint.value大于5.0的文档传递到下一个阶段。$merge阶段将输出写入sample_weatherstream数据库中名为stream的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。
变更流示例
以下聚合从cluster0-collection 源摄取数据,该源连接到加载示例数据集的Atlas 集群。要学习;了解如何创建流处理工作区以及如何将与Atlas 集群的连接添加到连接注册表中,请参阅Atlas Stream Processing入门。此聚合运行两个阶段以打开变更流并记录对data sample_weatherdata数据库中的 集合的变更:
$source阶段会连接到cluster0-collection源,并针对sample_weatherdata数据库中的data集合打开变更流。$merge阶段会将过滤后的变更流文档写入sample_weatherdata数据库中名为data_changes的 Atlas 集合。如果不存在此类集合,Atlas 则会进行创建。
{ $source: { connectionName: "cluster0-connection", db : "sample_weatherdata", coll : "data" }, $merge: { into: { connectionName: "cluster0-connection", db: "sample_weatherdata", coll: "data_changes" } } }
以下 mongosh 命令会删除 data 文档:
db.getSiblingDB("sample_weatherdata").data.deleteOne( { _id: ObjectId("5553a99ae4b02cf715120e4b") } )
删除 data 文档后,流处理器会将此变更流事件文档写入 sample_weatherdata.data_changes 集合。要查看生成的 sample_weatherdata.data_changes 集合中的文档,请使用 mongosh 连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherdata").data_changes.find()
[ { _id: { _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004' }, clusterTime: Timestamp({ t: 1738790819, i: 1 }), documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') }, ns: { db: 'sample_weatherdata', coll: 'data' }, operationType: 'delete', wallTime: ISODate('2025-02-05T21:26:59.313Z') } ]