定义
$emit 阶段在 连接注册表中指定要向其发送消息的连接。支持以下连接类型:
Apache Kafka 代理
AWS Kinesis数据流
AWS S3 存储桶
语法
Apache Kafka 代理
要将处理后的数据写入Apache Kafka代理,请使用具有以下原型形式的 $emit管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "<json-format>", "tombstoneWhen": <expression> } } }
$emit 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |||||
|---|---|---|---|---|---|---|---|---|
| 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 | |||||
| 字符串或表达式 | 必需 | 要向其发出消息的Apache Kafka主题的名称。 | |||||
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 | |||||
| int | Optional | ||||||
| 字符串 | Optional | 用于制造者生成的所有数据的压缩类型。默认值为“无”(即,不压缩)。有效值为:
压缩可用于完整批次的数据,因此批处理的效率会影响压缩比;批处理越多,压缩效果越好。 | |||||
| 字符串 | Optional | 日期值的日期格式。有效值为:
例如: 请考虑以下输入。 如果将 如果将 | |||||
| 表达式(expression) | Optional | 要添加到输出消息的标题。该表达式的计算结果必须为对象或数组。 如果表达式的计算结果为对象,Atlas Stream Processing 将根据该对象中的每个键值对构造一个标头,其中键是标头名称,值是标头值。 如果表达式的计算结果为数组,则它必须采用键值对对象数组的形式。例如: Atlas Stream Processing 从数组中的每个对象构造一个标头,其中键是标头名称,值是标头值。 Atlas Stream Processing 支持以下类型的标头值:
| |||||
| 对象 | 字符串 | 表达式 | Optional | 求值为 Apache Kafka 消息键的表达式。 如果您指定 | |||||
| 字符串 | 可选的 | 用于序列化Apache Kafka关键数据的数据类型。必须是以下值之一:
默认为 | |||||
| 字符串 | Optional | ||||||
| 表达式(expression) | Optional | 用于确定何时向Kafka发出 如果表达式无法计算出布尔值,或者无法计算, Atlas Stream Processing会将文档写入 DLQ。 如果您提供 |
Atlas 时间序列集合
要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
$emit 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
|---|---|---|---|
| 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 |
| 字符串 |表达式 | 必需 | 解析为包含目标时间序列集合的Atlas数据库的名称或表达式。 |
| 字符串 |表达式 | 必需 | 解析为要写入的Atlas时间序列集合的名称或表达式。 |
| 文档 | 必需 | 定义集合的时间序列字段的文档。 |
注意
时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。
AWS Kinesis
要将处理后的数据写入AWS Kinesis,请使用具有以下原型形式的$emit 管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "stream": "<stream-name>", "region": "<aws-region>", "partitionKey": "<key>" | <field> | <expression> "config": { "outputFormat": "<json-format>", "dateFormat": "default" | "ISO8601", } } }
$emit 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |||
|---|---|---|---|---|---|---|
| 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 | |||
| 字符串 | 必需 | 要连接的Kinesis数据流的名称。 | |||
| 字符串 | Optional | Kinesis Data Stream 运行的区域。 AWS 支持多个同名的流,每个流位于不同的地区。此参数允许Atlas Stream Processing区分此类流。 | |||
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 | |||
| 字符串 | Optional | 向Kinesis发送消息时使用的JSON格式。必须是以下值之一:
默认值为 | |||
| 字符串 | Optional | 日期值的日期格式。有效值为:
例如: 请考虑以下输入。 如果将 如果将 |
AWS S3
要将处理后的数据写入 AWS S3 存储桶的接收器连接,请使用 $emit 管道阶段,并采用以下原型形式:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
$emit 阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |||
|---|---|---|---|---|---|---|
| 字符串 | 必需 | 从中写入数据的连接的名称,如连接注册表所示。 | |||
| 字符串 | 必需 | 要写入数据的 S3 存储桶的名称。 | |||
| 字符串 | Optional | 目标存储桶所在的 AWS地区的名称。如果您在 AWS地区托管流处理工作区,则此参数默认为该地区。否则,默认为距离流处理工作区托管地区最近的 AWS地区。 | |||
| 字符串 |表达式 | 必需 | 写入 S3 存储桶的对象键的前缀。必须是文字前缀字符串或计算结果为字符串的表达式。 | |||
| 文档 | Optional | 包含附加参数的文档,这些参数会覆盖各种默认值。 | |||
| 文档 | Optional | 包含控制写入行为的附加参数的文档。这些参数会根据首先满足的阈值触发写入行为。 例如,如果摄入的文档达到 | |||
| 整型 | Optional | 写入 S3 的每个文件中要分组的文档数量。 | |||
| 整型 | Optional | 指定在将文件写入 S3 之前必须累积的最小字节数。字节计数由管道摄取的 BSON 文档的大小决定,而不是最终输出文件的大小。 | |||
| 文档 | Optional | 指定用于批量写入文档的计时器,作为 默认为 1 分钟。您无法将任何 | |||
| 整型 | 可选的 |
默认为 | |||
| 字符串 | 可选的 | 用于计数批量写入计时器的时间单位。该参数支持以下值:
默认为 | |||
| 字符串 | Optional | 生成文件中每个条目之间的分隔符。 默认值为 | |||
| 字符串 | Optional | 指定写入 S3 的 JSON 输出格式。必须是以下值之一:
默认为“ 要了解更多信息,请参阅 JSON。 | |||
| 字符串 | Optional | 日期值的日期格式。有效值为:
例如,如果您将以下记录添加到管道中: 那么,如果将 如果将 | |||
| 字符串 | Optional | 要使用的压缩算法名称。必须是以下值之一:
| |||
| 字符串 | 可选的 | 应用于所发出消息的压缩级别。支持值包括 默认值为 此参数为必填项,且仅限用于 |
Basic JSON
为了简化消息摄取,Atlas Stream Processing 支持 Basic JSON,这简化了 RelaxedJSON 格式。下表展示了所有受影响字段的简化示例。
字段类型 | relaxedJson | basicJson |
|---|---|---|
二进制文件 |
|
|
Date |
|
|
Decimal 数据类型 |
|
|
时间戳 |
|
|
ObjectId |
|
|
负无穷大 |
|
|
正无穷大 |
|
|
正则表达式 |
|
|
UUID |
|
|
行为
$emit 必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit阶段。
每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。
您可以使用动态表达式作为 topic、db 和 coll 字段的值,以启用流处理器能够逐条消息写入不同的目标。该表达式的计算结果必须为字符串。
例子
您有一个事务事件流,它会生成以下形式的消息:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
要将其中每一个分类到不同的Apache Kafka主题中,您可以写入以下 $emit 阶段:
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
此$emit阶段:
将
Very Important Industries消息写入名为VIP的主题。将
N. E. Buddy消息写入名为employee的主题。将
Khan Traktor消息写入名为contractor的主题。
有关动态表达式的更多信息,请参阅表达式操作符。
如果您指定的主题尚不存在,则Apache Kafka会在收到第一条指向该主题的消息时自动创建该主题。
如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列 (DLQ)(如果已配置)并处理后续消息。如果未配置死信队列(DLQ),则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source阶段与 Apache Kafka 代理建立连接,在名为my_weatherdata的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime。$match阶段排除airTemperature.value大于或等于30.0的文档,并将airTemperature.value小于30.0的文档传递到下一阶段。$addFields阶段用元数据扩充流。$emit阶段会通过weatherStreamOutputKafka 代理连接将输出写入名为stream的主题。
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
stream 主题中的文档采用以下形式:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。