定义
$meta
表达式返回一个包含文档所有流媒体元数据的对象。您可以公开此数据以用于整个流,或以下 Atlas Stream Processing 聚合阶段之一:
$meta
表达式采用以下原型形式:
{ "$meta": <string> }
{ source: { type: string, ts: date, source.topic: string source.partition: int source.offset: int source.key: string|int|long|double|object|binData source.headers: array[obj] }, window: { start: date, end: date }, https: { url: string method: string httpStatusCode: int responseTimeMs: int } }
语法
$meta
表达式接收一个字符串输入,该字符串与元数据源的完全限定点语法路径相对应。此路径的根目录必须是 "stream"
。您可以查询以下路径:
路径 | 类型 | 说明 |
---|---|---|
| 对象 | |
| 对象 |
|
| 字符串 | 用作数据源的连接类型。 |
| ISODate | 记录在数据引入点的日期和时间。 |
| 字符串 | 流从中摄取记录的 Kafka 主题。仅可从 Kafka 源获取。 |
| 整型 | 流从中提取记录的 Kafka 主题分区。仅可从 Kafka 源获取。 |
| 整型 | 在 Kafka 源分区中跟踪消息顺序和队列位置的偏移量。仅可从 Kafka 源获取。 |
| string|int|long|double|object|binData | 分配给 Kafka 消息的密钥,用于分区和负载分配。仅可从 Kafka 源获取。 |
| 阵列 | 描述 Kafka 消息元数据的键值对集合。 |
| 对象 |
|
| ISODate | 当前窗口的开始时间。 |
| ISODate | 当前窗口的结束时间。 |
| 对象 | |
| 字符串 | 获取 |
| 字符串 | 发送到 URL 的 HTTPS 请求方法。 |
| int | 发送到 URL 的请求的 HTTP 响应代码。 |
| int | 从 URL 接收响应所需的时间,以毫秒为单位。 |
行为
Atlas Stream Processing $meta
表达式提供了现有 MongoDB $meta
聚合表达式的所有功能。然而,您不能在标准的 MongoDB 聚合查询中使用 $meta
的 Atlas Stream Processing 版本特有的功能。
示例
以下示例通过包含数据摄取来源的 Kafka 主题数组来扩充流的输出:
{ $source: { connectionName: “kafka”, topic: [“t1”, “t2”, “t3”] } }, { $emit: { connectionName: “kafka”, topic: { $concat: [ { $meta: “stream.source.topic” }, “out" ] } } }
以下示例向流中添加一个字段,用于报告每个窗口的开始时间。
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }