定义
$emit
阶段在 连接注册表中指定要向其发送消息的连接。该连接必须是 Apache Kafka 代理或时间序列集合。
语法
Apache Kafka 代理
要将处理后的数据写入Apache Kafka代理,请使用具有以下原型形式的 $emit
管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "<json-format>", "tombstoneWhen": <expression> } } }
$emit
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |||||
---|---|---|---|---|---|---|---|---|
| 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 | |||||
| 字符串 |表达式 | 必需 | 要向其发出消息的Apache Kafka主题的名称。 | |||||
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 | |||||
| int | Optional | ||||||
| 字符串 | Optional | 用于制造者生成的所有数据的压缩类型。默认值为“无”(即,不压缩)。有效值为:
压缩可用于完整批次的数据,因此批处理的效率会影响压缩比;批处理越多,压缩效果越好。 | |||||
| 字符串 | Optional | 日期值的日期格式。有效值为:
例如: 请考虑以下输入。
如果将
如果将
| |||||
| 表达式(expression) | Optional | 要添加到输出消息的标题。该表达式的计算结果必须为对象或数组。 如果表达式的计算结果为对象,Atlas Stream Processing 将根据该对象中的每个键值对构造一个标头,其中键是标头名称,值是标头值。 如果表达式的计算结果为数组,则它必须采用键值对对象数组的形式。例如:
Atlas Stream Processing 从数组中的每个对象构造一个标头,其中键是标头名称,值是标头值。 Atlas Stream Processing 支持以下类型的标头值:
| |||||
| 对象 | 字符串 | 表达式 | Optional | 求值为 Apache Kafka 消息键的表达式。 如果您指定 | |||||
| 字符串 | 可选的 | 用于序列化Apache Kafka关键数据的数据类型。必须是以下值之一:
默认为 | |||||
| 字符串 | Optional | ||||||
| 表达式(expression) | Optional | 用于确定何时向Kafka发出 如果表达式无法计算出布尔值,或者无法计算, Atlas Stream Processing会将文档写入 DLQ。 如果您提供 |
Atlas 时间序列集合
要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit
管道阶段:
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
$emit
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | 从中提取数据的连接的名称,如连接注册表所示。 |
| 字符串 |表达式 | 必需 | 解析为包含目标时间序列集合的Atlas数据库的名称或表达式。 |
| 字符串 |表达式 | 必需 | 解析为要写入的Atlas时间序列集合的名称或表达式。 |
| 文档 | 必需 | 定义集合的时间序列字段的文档。 |
注意
时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。
AWS S3
要将处理后的数据写入 AWS S3 存储桶的接收器连接,请使用 $emit
管道阶段,并采用以下原型形式:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
$emit
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 | |||
---|---|---|---|---|---|---|
| 字符串 | 必需 | 从中写入数据的连接的名称,如连接注册表所示。 | |||
| 字符串 | 必需 | 要写入数据的 S3 存储桶的名称。 | |||
| 字符串 | Optional | 目标存储桶所在的 AWS 区域的名称。如果您在 AWS 区域托管流处理实例,此参数将默认为该区域。否则,它会默认选择距离流处理实例托管区域最近的 AWS 区域。 | |||
| 字符串 |表达式 | 必需 | 写入 S3 存储桶的对象键的前缀。必须是文字前缀字符串或计算结果为字符串的表达式。 | |||
| 文档 | Optional | 包含附加参数的文档,这些参数会覆盖各种默认值。 | |||
| 文档 | Optional | 包含控制写入行为的附加参数的文档。这些参数会根据首先满足的阈值触发写入行为。 例如,如果摄入的文档达到 | |||
| 整型 | Optional | 写入 S3 的每个文件中要分组的文档数量。 | |||
| 整型 | Optional | 指定在将文件写入 S3 之前必须累积的最小字节数。字节计数由管道摄取的 BSON 文档的大小决定,而不是最终输出文件的大小。 | |||
| 文档 | Optional | 指定用于批量写入文档的计时器,作为 默认为 1 分钟。您无法将任何 | |||
| 整型 | 可选的 |
默认为 | |||
| 字符串 | 可选的 | 用于计数批量写入计时器的时间单位。该参数支持以下值:
默认为 | |||
| 字符串 | Optional | 生成文件中每个条目之间的分隔符。 默认值为 | |||
| 字符串 | Optional | 指定写入 S3 的 JSON 输出格式。必须是以下值之一:
默认为“ 要了解更多信息,请参阅 JSON。 | |||
| 字符串 | Optional | 日期值的日期格式。有效值为:
例如,如果您将以下记录添加到管道中:
那么,如果将
如果将
| |||
| 字符串 | Optional | 要使用的压缩算法名称。必须是以下值之一:
| |||
| 字符串 | 可选的 | 应用于所发出消息的压缩级别。支持值包括 默认值为 此参数为必填项,且仅限用于 |
Basic JSON
为了简化消息摄取,Atlas Stream Processing 支持 Basic JSON,这简化了 RelaxedJSON 格式。下表展示了所有受影响字段的简化示例。
字段类型 | relaxedJson | basicJson |
---|---|---|
二进制文件 |
|
|
Date |
|
|
Decimal 数据类型 |
|
|
时间戳 |
|
|
ObjectId |
|
|
负无穷大 |
|
|
正无穷大 |
|
|
正则表达式 |
|
|
UUID |
|
|
行为
$emit
必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit
阶段。
每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。
您可以使用动态表达式作为 topic
、db
和 coll
字段的值,以启用流处理器能够逐条消息写入不同的目标。该表达式的计算结果必须为字符串。
例子
您有一个事务事件流,它会生成以下形式的消息:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
要将其中每一个分类到不同的Apache Kafka主题中,您可以写入以下 $emit
阶段:
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
此$emit
阶段:
将
Very Important Industries
消息写入名为VIP
的主题。将
N. E. Buddy
消息写入名为employee
的主题。将
Khan Traktor
消息写入名为contractor
的主题。
有关动态表达式的更多信息,请参阅表达式操作符。
如果您指定的主题尚不存在,则Apache Kafka会在收到第一条指向该主题的消息时自动创建该主题。
如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列 (DLQ)(如果已配置)并处理后续消息。如果未配置死信队列(DLQ),则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source
阶段与 Apache Kafka 代理建立连接,在名为my_weatherdata
的主题中收集这些报告,从而在将每条记录引入后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime
。$match
阶段排除airTemperature.value
大于或等于30.0
的文档,并将airTemperature.value
小于30.0
的文档传递到下一阶段。$addFields
阶段用元数据扩充流。$emit
阶段会通过weatherStreamOutput
Kafka 代理连接将输出写入名为stream
的主题。
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
stream
主题中的文档采用以下形式:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" }, "_stream_meta": { "source": { "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。