Docs 菜单
Docs 主页
/ /
/ / /

$source 阶段(流处理)

$source

$source 阶段在 Connection Registry 中指定要从中流式传输数据的连接。支持以下连接类型:

  • Apache Kafka 代理

  • MongoDB collection change stream

  • MongoDB database change stream

  • MongoDB 集群变更流

  • AWS Kinesis数据流

  • 文档数组

为了操作来自Apache Kafka代理的流媒体数据,$source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识连接注册表中要从中提取数据的连接的标签。

topic

字符串或字符串数组

必需

从中流式传输消息的一个或多个Apache Kafka主题的名称。如果要流多个主题的消息,请在数组中指定这些主题。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的表达式:$toDate

  • 将源消息字段作为参数的表达式:$dateFromString

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

partitionIdleTimeout

文档

Optional

指定分区在水印计算中被忽略之前允许其空闲的时间长度的文档。

默认情况下,此字段为禁用。要处理因空闲而无进展的分区,请为此字段设置一个值。

partitionIdleTimeout.size

整型

Optional

指定分区空闲超时持续时间的数字。

partitionIdleTimeout.unit

字符串

Optional

分区空闲超时持续时间的时间单位。

unit的值可以是以下值之一:

  • "ms" (毫秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.auto_offset_reset

字符串

Optional

指定从 Apache Kafka 源主题中的哪个事件开始摄取。auto_offset_reset 采用以下值:

  • endlatestlargest :在初始化聚合时从主题中的最新事件开始摄取。

  • earliestbeginningsmallest :从主题中最早的事件开始摄取。

默认值为 latest

config.group_id

字符串

Optional

Kafka 消费者组的 ID 用于与流处理器关联。如果省略,Atlas Stream Processing 会将 Stream Processing 工作区关联为以下格式的自动生成 ID:

asp-${streamProcessorId}-consumer

Atlas Stream Processing会自动为所有持久流处理器生成此参数的值。对于使用 sp 定义的临时流处理器。 进程(),仅当您手动定义该参数时才会设立该参数。

config.enable_auto_commit

布尔

可选的

用于确定Kafka代理分区偏移提交策略的标志。Atlas Stream Processing支持两种提交策略:

  • 如果将此参数设立为 true,则每次 $source 阶段将数据传递给下一个操作符时, Atlas Stream Processing都会提交偏移量。

  • 如果将此参数设立为 false,流处理器会在Atlas Stream Processing获取检查点时提交分区偏移量。

仅当设立了 config.group_id 时才能设立此参数。

对于使用 sp falsegroup_id定义的临时流处理器。 进程(),此参数默认为 ,除非您设立了 。否则默认为true

config.keyFormat

字符串

Optional

用于反序列化 Apache Kafka key 数据的数据类型。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认值为 binData

config.keyFormatError

字符串

Optional

如何处理在反序列化 Apache Kafka 密钥数据时遇到的错误。必须是以下值之一:

  • dlq,将文档写入您的死信队列。

  • passThrough,将文档发送到缺少关键数据的下一阶段。

注意

Atlas Stream Processing 要求源数据流中的文档在 jsonejson有效。Atlas Stream Processing 会将不满足此要求的文档设置为死信队列 (如果您已配置)。

Atlas 集合变更流允许应用程序访问针对单个集合的实时数据更改。要学习如何对集合打开变更流,请参阅变更流

为了操作来自 Atlas collection 的流媒体数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

coll

字符串或字符串数组

必需

connectionName 指定的Atlas实例上托管的一个或多个MongoDB集合的名称。 这些集合的变更流充当流媒体数据源。 如果省略此字段,流处理器将从MongoDB数据库变更流中获取数据。

initialSync

文档

Optional

包含与 initialSync 功能相关的字段的文档。

Atlas Stream Processing initialSync 允许您摄取Atlas集合中预先存在的文档,就像它们是插入 changeEvent 文档一样。如果启用initialSync,当您启动流处理器时,它将首先摄取并进程集合中的所有现有文档,然后再摄取和进程新传入的 changeEvent 文档。initialSync 一旦完成,就不再重复。

如果启用initialSync,则无法在管道中使用 $hoppingWindow$sessionWindow$tumblingWindow 阶段。

重要提示:您只能在传入文档的 _id 值为默认生成的ObjectId值或有序的 int/long 值的集合上使用 initialSync。所有 _id 值必须为同一类型。

initialSync.enable

布尔

可选的

确定是否启用initialSync。如果声明 initialSync字段,则必须设立此字段。

initialSync.parallelism

整型

Optional

确定进程initialSync 操作的并行度级别。如果不指定值,则默认为 1

每个流处理器都有一个由其层级确定的最大累积并行度值。流处理器的累积并行度计算如下:

parallelism total - parallelized stages

其中,parallelism total$source$lookup$merge 阶段中所有大于 1 的 parallelism 值的总和,parallelized stagesparallelism 值大于 1

示例,如果您的 $source 阶段将 parallelism 值设置为 4,则 $lookup 阶段未设置 parallelism 值(因此默认为 1),并且 $merge 阶段将 parallelism 值为 2,则有两个 parallelized stages,流处理器的累积并行度计算为 (4 + 2) - 2

如果流处理器超过其层级的最大累积并行度, Atlas Stream Processing会引发错误,并建议您实现预期并行度层级所需的最低处理器层级。您必须将处理器扩展到更高层级,或降低阶段的并行度值,才能解决该错误。要学习,了解更多信息,请参阅Stream Processing

readPreference

文档

Optional

initialSync操作的读取偏好。

默认值为 primary

readPreferenceTags

文档

Optional

读取 initialSync操作的偏好标签。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

接受 MongoDB 扩展 JSON $date$timestamp 值。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.pipeline

文档

Optional

指定一个聚合管道,用于传递变更流输出进行进一步处理之前对其进行过滤。该管道必须符合修改变更流输出中描述的参数。

重要提示:每个变更事件都包括 wallTimeclusterTime 字段。$source 之后的 Atlas Stream Processing 阶段期望接收到处理器摄取的这些字段。为确保正确处理变更流数据,请勿修改 $source.config.pipeline 中的这些字段。

config.maxAwaitTimeMS

整型

Optional

在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000

Atlas 数据库变更流允许应用程序访问针对单个数据库的实时数据更改。要学习如何针对数据库打开变更流,请参阅变更流

为了操作来自 Atlas 数据库变更流的流数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

接受 MongoDB 扩展 JSON $date$timestamp 值。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。

重要提示:每个变更事件都包括 wallTimeclusterTime 字段。$source 之后的 Atlas Stream Processing 阶段期望接收到处理器摄取的这些字段。为确保正确处理变更流数据,请勿修改 $source.config.pipeline 中的这些字段。

config.maxAwaitTimeMS

整型

Optional

在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000

要对来自整个 Atlas 集群变更流的流数据进行操作, $source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

日期 | 时间戳

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

接受 MongoDB 扩展 JSON $date$timestamp 值。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。

请注意, Atlas Stream Processing希望从每个摄取的变更事件中接收 wallTimeclusterTime 字段。为确保正确处理变更流数据,请勿修改 $source.config.pipeline 中的这些字段。

config.maxAwaitTimeMS

整型

Optional

在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000

为了对来自 AWS Kinesis 数据流的数据进行操作,$source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<field-name>",
"shardIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"consumerARN": "<aws-arn>",
"initialPosition": <initial-position>,
reshardDetectionIntervalSecs: <interval>
}
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识连接注册表中连接的标签,从中提取数据。

stream

字符串

必需

从中流式传输消息的 AWS Kinesis 数据流。

region

字符串

可选的

指定流所在的 AWS地区。Kinesis支持在不同区域具有相同名称的多个数据流。如果您在同一连接内的两个或多个区域中对数据流使用相同的名称,则必须使用此字段来指定要使用的名称和区域的组合。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

shardIdleTimeout

文档

Optional

指定分片在水印计算中被忽略之前允许分片的时间长度的文档。

默认下,此字段处于禁用状态。要处理由于空闲而不向前移动的分片,请为此字段设立一个值。

shardIdleTimeout.size

文档

Optional

指定分片空闲超时持续时间的数字。

shardIdleTimeout.unit

文档

Optional

分片空闲超时持续时间的时间单位。

unit的值可以是以下值之一:

  • "ms" (毫秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.consumerARN

字符串

Optional

Kinesis 消费者相对应的 ARN。如果您指定此字段,您的使用者将使用增强型扇出;否则, Kinesis将使用标准使用者。

config.initialPosition

字符串

Optional

在Kinesis数据流历史记录中开始摄取消息的位置。必须是以下任一项:

  • "TRIM_HORIZON":从分片最早的消息开始摄取。

  • "LATEST":从分片的最新消息开始摄取。

默认为“LATEST”。

reshardDetectionIntervalSecs

整型

Optional

为了重新分片而检查流经Kinesis流的数据速率的时间间隔(以秒为单位)。

为了对文档数组进行操作, $source阶段具有以下原型形式:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

documents

阵列

可选的

用作流媒体数据源的文档数组。该字段的值可以是对象数组,也可以是计算结果为对象数组的表达式。使用connectionName字段时请勿使用此字段。

$source必须是它所在的任何管道的第一阶段。 每个管道只能使用一个$source阶段。

对于Kafka $source 阶段, Atlas Stream Processing会并行读取源主题内的多个分区。分区限制由处理器层级决定。如需学习;了解更多信息,请参阅流处理计费参考文档。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与 Apache Kafka 代理建立连接,在名为 my_weatherdata 的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $match 阶段会排除 dewPoint.value 小于或等于 5.0 的文档,并将 dewPoint.value 大于 5.0 的文档传递到下一个阶段。

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

以下聚合会从 cluster0-collection 源引入数据,而该源会连接到已加载示例数据集的 Atlas 集群。要了解如何创建 Stream Processing 工作区并将 Atlas 集群连接添加到连接注册表,请参阅Atlas Stream Processing 入门。此聚合会运行两个阶段以打开变更流并记录对 sample_weatherdata 数据库中 data 集合的更改:

  1. $source 阶段会连接到 cluster0-collection 源,并针对 sample_weatherdata 数据库中的 data 集合打开变更流。

  2. $merge 阶段会将过滤后的变更流文档写入 sample_weatherdata 数据库中名为 data_changes 的 Atlas 集合。如果不存在此类集合,Atlas 则会进行创建。

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

以下 mongosh 命令会删除 data 文档:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

删除 data 文档后,流处理器会将此变更流事件文档写入 sample_weatherdata.data_changes 集合。要查看生成的 sample_weatherdata.data_changes 集合中的文档,请使用 mongosh 连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

后退

聚合阶段

在此页面上