Docs 菜单
Docs 主页
/
Atlas
/ /

$emit 聚合阶段(流处理)

$emit 阶段在 连接注册表中指定要向其发送消息的连接。该连接必须是 Apache Kafka 代理或时间序列集合。

要将处理后的数据写入Apache Kafka代理,请使用具有以下原型形式的 $emit管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<serialization-type>",
"outputFormat": "<json-format>",
"tombstoneWhen": <expression>
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

从中提取数据的连接的名称,如连接注册表所示。

topic

字符串 |表达式

必需

要向其发出消息的Apache Kafka主题的名称。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.acks

int

Optional

成功执行 $emit 操作所需的来自 Apache Kafka 集群的确认次数。

默认值为 all。Atlas Stream Processing 支持以下值:

  • -1

  • 0

  • 1

  • all

config.compression_type

字符串

Optional

用于制造者生成的所有数据的压缩类型。默认值为“无”(即,不压缩)。有效值为:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

压缩可用于完整批次的数据,因此批处理的效率会影响压缩比;批处理越多,压缩效果越好。

config.dateFormat

字符串

Optional

日期值的日期格式。有效值为:

  • default - 使用 outputFormat 的默认值。

  • ISO8601 - 将日期转换为 ISO8601 格式的字符串,其中包含毫秒精度 (YYYY-MM-DDTHH:mm:ss.sssZ)。

例如:

请考虑以下输入。

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

如果将 $emit.config.dateFormat 设置为 default,输出将类似于以下内容:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

如果将 $emit.config.dateFormat 设置为 ISO8601,输出将类似于以下内容:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

表达式(expression)

Optional

要添加到输出消息的标题。该表达式的计算结果必须为对象或数组。

如果表达式的计算结果为对象,Atlas Stream Processing 将根据该对象中的每个键值对构造一个标头,其中键是标头名称,值是标头值。

如果表达式的计算结果为数组,则它必须采用键值对对象数组的形式。例如:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing 从数组中的每个对象构造一个标头,其中键是标头名称,值是标头值。 Atlas Stream Processing 支持以下类型的标头值:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

对象 | 字符串 | 表达式

Optional

求值为 Apache Kafka 消息键的表达式。

如果您指定 config.key,则必须指定 config.keyFormat

config.keyFormat

字符串

可选的

用于序列化Apache Kafka关键数据的数据类型。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认为 binData。如果您指定 config.key,则必须指定 config.keyFormat。如果文档的 config.key 未成功序列化为指定的数据类型,Atlas Stream Processing 会将其发送到死信队列。

config.outputFormat

字符串

Optional

Apache Kafka 发送消息时使用的 JSON 格式。必须是以下值之一:

  • "relaxedJson"

  • "canonicalJson"

默认值为 "relaxedJson"

config.tombstoneWhen

表达式(expression)

Optional

用于确定何时向Kafka发出 null 的表达式。该表达式的计算结果必须为布尔值 truefalse。当给定文档的表达式计算结果为 true 时, Atlas Stream Processing会向Kafka接收器发出一个 null 代替。当表达式的计算结果为 false 时, Atlas Stream Processing在到达 $emit 阶段时按原样发出文档。

如果表达式无法计算出布尔值,或者无法计算, Atlas Stream Processing会将文档写入 DLQ。

如果您提供 $emit.config.key$emit.config.keyFormat 值,则此设置可用于启用主题压实。如果您不提供这些值,当该表达式的计算结果为 true 时, Atlas Stream Processing仍会发出 null,但不会触发Kafka主题压实。

要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>" | <expression>,
"coll": "<target-coll>" | <expression>,
"timeseries": {
<options>
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

从中提取数据的连接的名称,如连接注册表所示。

db

字符串 |表达式

必需

解析为包含目标时间序列集合的Atlas数据库的名称或表达式。

coll

字符串 |表达式

必需

解析为要写入的Atlas时间序列集合的名称或表达式。

timeseries

文档

必需

定义集合的时间序列字段的文档。

注意

时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。

要将处理后的数据写入 AWS S3 存储桶的接收器连接,请使用 $emit 管道阶段,并采用以下原型形式:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

从中写入数据的连接的名称,如连接注册表所示。

bucket

字符串

必需

要写入数据的 S3 存储桶的名称。

region

字符串

Optional

目标存储桶所在的 AWS 区域的名称。如果您在 AWS 区域托管流处理实例,此参数将默认为该区域。否则,它会默认选择距离流处理实例托管区域最近的 AWS 区域。

path

字符串 |表达式

必需

写入 S3 存储桶的对象键的前缀。必须是文字前缀字符串或计算结果为字符串的表达式。

config

文档

Optional

包含附加参数的文档,这些参数会覆盖各种默认值。

config.writeOptions

文档

Optional

包含控制写入行为的附加参数的文档。这些参数会根据首先满足的阈值触发写入行为。

例如,如果摄入的文档达到 config.writeOptions.count 阈值但未达到 config.writeOptions.interval 阈值,流处理器仍会根据 config.writeOptions.count 阈值将这些文档发送到 S3

config.writeOptions.count

整型

Optional

写入 S3 的每个文件中要分组的文档数量。

config.writeOptions.bytes

整型

Optional

指定在将文件写入 S3 之前必须累积的最小字节数。字节计数由管道摄取的 BSON 文档的大小决定,而不是最终输出文件的大小。

config.writeOptions.interval

文档

Optional

指定用于批量写入文档的计时器,作为 sizeunits 的组合。

默认为 1 分钟。您无法将任何 unitsize 设置为 0。最大间隔为 7 天。

config.writeOptions.interval.size

整型

可选的

writeOptions.interval.units 指定的单位数,之后流处理器将文档批量写入 S3

默认为 1。您不能将 size 设置为 0。如果您定义 writeOptions.interval,则还必须定义此参数。

config.writeOptions.interval.units

字符串

可选的

用于计数批量写入计时器的时间单位。该参数支持以下值:

  • ms

  • second

  • minute

  • hour

  • day

默认为 minute。如果您定义 writeOptions.interval,则还必须定义此参数。

config.delimiter

字符串

Optional

生成文件中每个条目之间的分隔符。

默认值为 \n

config.outputFormat

字符串

Optional

指定写入 S3 的 JSON 输出格式。必须是以下值之一:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

默认为“relaxedJson”。

要了解更多信息,请参阅 JSON。

config.dateFormat

字符串

Optional

日期值的日期格式。有效值为:

  • default - 使用 outputFormat 的默认值。

  • ISO8601 - 将日期转换为 ISO8601 格式的字符串,其中包含毫秒精度 (YYYY-MM-DDTHH:mm:ss.sssZ)。

例如,如果您将以下记录添加到管道中:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

那么,如果将 $emit.config.dateFormat 设置为 default,输出将类似于以下内容:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

如果将 $emit.config.dateFormat 设置为 ISO8601,输出将类似于以下内容:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

字符串

Optional

要使用的压缩算法名称。必须是以下值之一:

  • "gzip"

  • "snappy"

config.compressionLevel

字符串

可选的

应用于所发出消息的压缩级别。支持值包括 1-9;值越高,压缩率越高。

默认值为 6

此参数为必填项,且仅限用于 gzip。如果将 config.compression 设置为 snappy,设置此参数将没有效果。

为了简化消息摄取,Atlas Stream Processing 支持 Basic JSON,这简化了 RelaxedJSON 格式。下表展示了所有受影响字段的简化示例。

字段类型
relaxedJson
basicJson

二进制文件

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

Date

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

Decimal 数据类型

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

时间戳

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

负无穷大

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

正无穷大

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

正则表达式

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

$emit 必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit阶段。

每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。

您可以使用动态表达式作为 topicdbcoll 字段的值,以启用流处理器能够逐条消息写入不同的目标。该表达式的计算结果必须为字符串。

例子

您有一个事务事件流,它会生成以下形式的消息:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

要将其中每一个分类到不同的Apache Kafka主题中,您可以写入以下 $emit 阶段:

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

$emit阶段:

  • Very Important Industries消息写入名为VIP的主题。

  • N. E. Buddy消息写入名为employee的主题。

  • Khan Traktor消息写入名为contractor的主题。

有关动态表达式的更多信息,请参阅表达式操作符。

如果您指定的主题尚不存在,则Apache Kafka会在收到第一条指向该主题的消息时自动创建该主题。

如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列 (DLQ)(如果已配置)并处理后续消息。如果未配置死信队列(DLQ),则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与 Apache Kafka 代理建立连接,在名为 my_weatherdata 的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $match 阶段排除 airTemperature.value 大于或等于 30.0 的文档,并将 airTemperature.value 小于 30.0 的文档传递到下一阶段。

  3. $addFields 阶段用元数据扩充流。

  4. $emit 阶段会通过 weatherStreamOutput Kafka 代理连接将输出写入名为 stream 的主题。

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

stream 主题中的文档采用以下形式:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
},
"_stream_meta": {
"source": {
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$tumblingWindow

在此页面上