Docs 菜单
Docs 主页
/
Atlas
/ /

$meta 聚合阶段(流处理)

$meta表达式返回一个包含文档所有流媒体元数据的对象。您可以公开此数据以用于整个流,或以下 Atlas Stream Processing 聚合阶段之一:

  • $source

  • $hoppingWindow

  • $tumblingWindow

  • $https

$meta 表达式采用以下原型形式:

{ "$meta": <string> }
{
source: {
type: string,
ts: date,
source.topic: string
source.partition: int
source.offset: int
source.key: string|int|long|double|object|binData
source.headers: array[obj]
},
window: {
start: date,
end: date
},
https: {
url: string
method: string
httpStatusCode: int
responseTimeMs: int
}
}

$meta 表达式接收一个字符串输入,该字符串与元数据源的完全限定点语法路径相对应。此路径的根目录必须是 "stream"。您可以查询以下路径:

路径
类型
说明

stream

对象

所有 $source 阶段、任何窗口阶段或 $https 阶段在管道中配置的元数据。

stream.source

对象

$source 阶段的所有元数据。

stream.source.type

字符串

用作数据源的连接类型。

stream.source.ts

ISODate

记录在数据引入点的日期和时间。

stream.source.topic

字符串

流从中摄取记录的 Kafka 主题。仅可从 Kafka 源获取。

stream.source.partition

整型

流从中提取记录的 Kafka 主题分区。仅可从 Kafka 源获取。

stream.source.offset

整型

在 Kafka 源分区中跟踪消息顺序和队列位置的偏移量。仅可从 Kafka 源获取。

stream.source.key

string|int|long|double|object|binData

分配给 Kafka 消息的密钥,用于分区和负载分配。仅可从 Kafka 源获取。

stream.source.headers

阵列

描述 Kafka 消息元数据的键值对集合。

stream.window

对象

$hoppingWindow 阶段或 $tumblingWindow 阶段的所有元数据。仅在文档进入 $hoppingWindow$tumblingWindow 阶段时,才会设置该对象。

stream.window.start

ISODate

当前窗口的开始时间。

stream.window.end

ISODate

当前窗口的结束时间。

stream.https

对象

$https 阶段的所有元数据。只有当 $https 阶段输出文档时,才会设置该对象。

stream.https.url

字符串

获取 $https 阶段数据的 URL。

stream.https.method

字符串

发送到 URL 的 HTTPS 请求方法。

stream.https.httpStatusCode

int

发送到 URL 的请求的 HTTP 响应代码。

stream.https.responseTimeMs

int

从 URL 接收响应所需的时间,以毫秒为单位。

Atlas Stream Processing $meta 表达式提供了现有 MongoDB $meta 聚合表达式的所有功能。然而,您不能在标准的 MongoDB 聚合查询中使用 $meta 的 Atlas Stream Processing 版本特有的功能。

以下示例通过包含数据摄取来源的 Kafka 主题数组来扩充流的输出:

{
$source: {
connectionName: “kafka”,
topic: [“t1”, “t2”, “t3”]
}
},
{
$emit: {
connectionName: “kafka”,
topic: {
$concat: [
{
$meta: “stream.source.topic”
},
“out"
]
}
}
}

以下示例向流中添加一个字段,用于报告每个窗口的开始时间。

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}

后退

$currentDate

在此页面上