Docs 菜单
Docs 主页
/ /
/ / /

$source 阶段(流处理)

$source

$source 阶段在 Connection Registry 中指定要从中流式传输数据的连接。支持以下连接类型:

为了操作来自Apache Kafka代理的流媒体数据,$source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识连接注册表中要从中提取数据的连接的标签。

topic

字符串或字符串数组

必需

一个或多个 Apache Kafka 主题的名称,用于从这些主题流式传输信息。如果要流式传输多个主题的消息,请在数组中指定这些主题。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

partitionIdleTimeout

文档

Optional

指定分区在水印计算中被忽略之前允许其空闲的时间长度的文档。

默认情况下,此字段为禁用。要处理因空闲而无进展的分区,请为此字段设置一个值。

partitionIdleTimeout.size

整型

Optional

指定分区空闲超时持续时间的数字。

partitionIdleTimeout.unit

字符串

Optional

分区空闲超时持续时间的时间单位。

unit的值可以是以下值之一:

  • "ms" (毫秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.auto_offset_reset

字符串

Optional

指定从 Apache Kafka 源主题中的哪个事件开始摄取。auto_offset_reset 采用以下值:

  • endlatestlargest :在初始化聚合时从主题中的最新事件开始摄取。

  • earliestbeginningsmallest :从主题中最早的事件开始摄取。

默认值为 latest

config.group_id

字符串

Optional

与流处理器关联的 kafka 使用者群组的ID 。如果省略, Atlas Stream Processing会将流处理工作区与以下格式的自动生成的ID相关联:

asp-${streamProcessorId}-consumer

Atlas Stream Processing会自动为所有持久流处理器生成此参数的值。对于使用 sp 定义的临时流处理器。 进程(),仅当您手动定义该参数时才会设立该参数。

config.enable_auto_commit

布尔

可选的

用于确定Kafka代理分区偏移提交策略的标志。 Atlas Stream Processing支持两种提交策略:

  • 如果将此参数设立为 true,则每次 $source 阶段将数据传递给下一个操作符时, Atlas Stream Processing都会提交偏移量。

  • 如果将此参数设立为 false,流处理器会在Atlas Stream Processing获取检查点时提交分区偏移量。

仅当设立了config.group_id 时才能设立此参数。

对于使用 sp falsegroup_id定义的临时流处理器。 进程(),此参数默认为 ,除非您设立了 。否则默认为true

config.keyFormat

字符串

Optional

用于反序列化 Apache Kafka key 数据的数据类型。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认值为 binData

config.keyFormatError

字符串

Optional

如何处理在反序列化 Apache Kafka 密钥数据时遇到的错误。必须是以下值之一:

  • dlq,将文档写入您的死信队列。

  • passThrough,将文档发送到缺少关键数据的下一阶段。

注意

Atlas Stream Processing 要求源数据流中的文档在 jsonejson有效。Atlas Stream Processing 会将不满足此要求的文档设置为死信队列 (如果您已配置)。

Atlas 集合变更流允许应用程序访问针对单个集合的实时数据更改。要学习如何对集合打开变更流,请参阅变更流

为了操作来自 Atlas collection 的流媒体数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

coll

字符串或字符串数组

必需

connectionName 指定的Atlas实例上托管的一个或多个MongoDB集合的名称。 这些集合的变更流充当流媒体数据源。 如果省略此字段,流处理器将从MongoDB数据库变更流中获取数据。

initialSync

文档

Optional

包含与 initialSync 功能相关的字段的文档。

Atlas Stream Processing initialSync 允许您摄取Atlas集合中预先存在的文档,就像它们是插入 changeEvent 文档一样。如果启用initialSync,当您启动流处理器时,它将首先摄取并进程集合中的所有现有文档,然后再摄取和进程新传入的 changeEvent 文档。initialSync 一旦完成,就不再重复。

如果启用initialSync,则无法在管道中使用 $hoppingWindow$sessionWindow$tumblingWindow 阶段。

重要

您只能在传入文档的 _id 值为默认生成的 ObjectId 值或有序的 int/long 值的集合上使用 initialSync。所有 _id 值必须为同一类型。

initialSync.enable

布尔

可选的

确定是否启用initialSync。如果声明 initialSync字段,则必须设立此字段。

initialSync.parallelism

整型

Optional

确定进程initialSync 操作的并行度级别。如果不指定值,则默认为 1

readPreference

文档

Optional

initialSync操作的读取偏好。

默认值为 primary

readPreferenceTags

文档

Optional

读取 initialSync操作的偏好标签。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

接受 MongoDB 扩展 JSON $date$timestamp 值。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.pipeline

文档

Optional

指定一个聚合管道,用于传递变更流输出进行进一步处理之前对其进行过滤。该管道必须符合修改变更流输出中描述的参数。

重要

每个变更事件都包括 wallTimeclusterTime 字段。$source 之后的Atlas Stream Processing阶段希望在处理器摄取这些字段时接收这些字段。为确保正确处理变更流数据,请勿修改 $source.config.pipeline 中的这些字段。

config.maxAwaitTimeMS

整型

Optional

在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000

Atlas 数据库变更流允许应用程序访问针对单个数据库的实时数据更改。要学习如何针对数据库打开变更流,请参阅变更流

为了操作来自 Atlas 数据库变更流的流数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

接受 MongoDB 扩展 JSON $date$timestamp 值。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。

重要

每个变更事件都包括 wallTimeclusterTime 字段。$source 之后的Atlas Stream Processing阶段希望在处理器摄取这些字段时接收这些字段。为确保正确处理变更流数据,请勿修改 $source.config.pipeline 中的这些字段。

config.maxAwaitTimeMS

整型

Optional

在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000

要对来自整个 Atlas 集群变更流的流数据进行操作, $source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

日期 | 时间戳

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

接受 MongoDB 扩展 JSON $date$timestamp 值。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。

请注意, Atlas Stream Processing希望从每个摄取的变更事件中接收 wallTimeclusterTime 字段。为确保正确处理变更流数据,请勿修改 $source.config.pipeline 中的这些字段。

config.maxAwaitTimeMS

整型

Optional

在返回空批次之前,等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000

为了对文档数组进行操作, $source阶段具有以下原型形式:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

documents

阵列

可选的

用作流媒体数据源的文档数组。该字段的值可以是对象数组,也可以是计算结果为对象数组的表达式。使用connectionName字段时请勿使用此字段。

$source必须是它所在的任何管道的第一阶段。 每个管道只能使用一个$source阶段。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与 Apache Kafka 代理建立连接,在名为 my_weatherdata 的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $match 阶段会排除 dewPoint.value 小于或等于 5.0 的文档,并将 dewPoint.value 大于 5.0 的文档传递到下一个阶段。

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

以下聚合从cluster0-collection 源摄取数据,该源连接到加载示例数据集的Atlas 集群。要学习;了解如何创建流处理工作区以及如何将与Atlas 集群的连接添加到连接注册表中,请参阅Atlas Stream Processing入门。此聚合运行两个阶段以打开变更流并记录对data sample_weatherdata数据库中的 集合的变更:

  1. $source 阶段会连接到 cluster0-collection 源,并针对 sample_weatherdata 数据库中的 data 集合打开变更流。

  2. $merge 阶段会将过滤后的变更流文档写入 sample_weatherdata 数据库中名为 data_changes 的 Atlas 集合。如果不存在此类集合,Atlas 则会进行创建。

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

以下 mongosh 命令会删除 data 文档:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

删除 data 文档后,流处理器会将此变更流事件文档写入 sample_weatherdata.data_changes 集合。要查看生成的 sample_weatherdata.data_changes 集合中的文档,请使用 mongosh 连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

后退

聚合阶段

在此页面上