Docs 菜单
Docs 主页
/
MongoDB Atlas
/ /

$emit

在此页面上

  • 定义
  • 语法
  • Apache Kafka 代理
  • Atlas 时间序列集合
  • 行为
  • 示例

$emit阶段指定 连接注册表 中要向其发送消息的连接。连接必须是 Apache Kafka 代理或 时间序列集合。

将处理后的数据写入 Apache Kafka $emit代理,使用具有以下原型形式的 管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明
connectionName
字符串
必需
名称(显示在连接注册表中)从中引入数据的连接的名称。
topic
字符串 |表达式
必需
Apache Kafka 的名称 要向其发送消息的主题。
config
文档
Optional
包含可覆盖各种默认值的字段的文档。
config.headers
表达式(expression)
Optional

要添加到输出消息的标头。该表达式的计算结果必须为对象或数组。

如果表达式的计算结果为一个对象,Atlas Stream Processing 会根据该对象中的每个键值对构造一个标头,其中键是标头名称,值是标头值。

如果表达式的计算结果为数组,则必须采用键值对对象数组的形式。例如:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing 从数组中的每个对象构造标头,其中键是标头名称,值是标头值。

Atlas Stream Processing 支持以下类型的标头值:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key
对象 |字符串
Optional

计算结果为 Apache Kafka 的表达式 消息键。

如果指定config.key ,则必须指定config.keyFormat

config.keyFormat
字符串
可选的

用于反序列化 Apache Kafka 的数据类型 关键数据。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认为binData 。如果指定config.key ,则必须指定config.keyFormat 。如果文档的config.key未成功反序列化为指定的数据类型,Atlas Stream Processing 会将其发送到死信队列。

config.outputFormat
字符串
Optional

向 Apache Kafka 发送消息时使用的 JSON 格式 。必须是以下值之一:

  • "relaxedJson"

  • "canonicalJson"

默认值为 "relaxedJson"

要将处理后的数据写入 Atlas 时间序列集合,请使用具有以下原型形式的$emit管道阶段:

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

$emit 阶段采用包含以下字段的文档:

字段
类型
必要性
说明
connectionName
字符串
必需
名称(显示在连接注册表中)从中引入数据的连接的名称。
db
字符串
必需
包含目标时间序列集合的 Atlas 数据库的名称。
coll
字符串
必需
要写入的 Atlas 时间序列集合的名称。
timeseries
文档
必需
定义集合的时间序列字段的文档。

注意

时间序列集合内文档的最大大小为4 MB。 要了解详情,请参阅时间序列集合限制。

$emit 必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$emit阶段。

每个流处理器只能写入一个 Atlas 时间序列集合。 如果您指定的集合不存在,Atlas 将使用您指定的时间序列字段创建该集合。 您必须指定一个现有数据库。

您可以使用 动态表达式 作为topic 字段的值,以使流处理器能够写入不同的目标 Apache Kafka 逐条消息的主题。该表达式的计算结果必须为字符串。

例子

您有一个事务事件流,它会生成以下形式的消息:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

要将其中每一个分类到不同的 Apache Kafka 主题,您可以编写以下$emit 阶段:

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

$emit阶段:

  • Very Important Industries消息写入名为VIP的主题。

  • N. E. Buddy消息写入名为employee的主题。

  • Khan Traktor消息写入名为contractor的主题。

有关动态表达式的更多信息,请参阅表达式操作符。

如果您指定的主题尚不存在, Apache Kafka 当它收到第一条针对该主题的消息时,会自动创建该主题。

如果您使用动态表达式指定主题,但 Atlas Stream Processing 无法评估给定消息的表达式,则 Atlas Stream Processing 会将该消息发送到死信队列(如果已配置)并处理后续消息。如果没有配置死信队列,Atlas Stream Processing 会完全跳过该消息并处理后续消息。

流数据源从不同位置生成详细的天气报告,符合样本天气数据集的模式。以下聚合分为三个阶段:

  1. $source阶段与 Apache Kafka 建立连接 代理会在名为my_weatherdata 的主题中收集这些报告,从而在将每条记录摄取到后续聚合阶段时将其公开。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime

  2. $match阶段会排除airTemperature.value大于或等于30.0的文档,并将airTemperature.value小于30.0的文档传递到下一阶段。

  3. $emit阶段通过weatherStreamOutput Kafka代理连接将输出写入名为stream的主题。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'airTemperature.value': { '$lt': 30 } } },
{
'$emit': {
connectionName: 'weatherStreamOutput',
topic: 'stream'
}
}

stream主题中的文档采用以下形式:

{
"st":"x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8,116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1","AG1","UG1","SA1","MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight":{
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime":{
"$date":"2024-09-26T17:34:41.843Z"
},
"_stream_meta":{
"source":{
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

注意

以上是一个具有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$tumbleWindow