Definición
La expresión $meta devuelve un objeto que contiene todos los metadatos de transmisión para un documento. Puede exponer estos datos para todo el flujo, o bien para una de las siguientes etapas de agregación de Atlas Stream Processing:
Una expresión $meta tiene la siguiente forma de prototipo:
{ "$meta": <string> }
{ source: { type: string, ts: date, source.topic: string source.partition: int source.offset: int source.key: string|int|long|double|object|binData source.headers: array[obj] }, window: { start: date, end: date }, https: { url: string method: string httpStatusCode: int responseTimeMs: int } }
Sintaxis
La expresión $meta toma una única string como entrada, que corresponde a la ruta totalmente calificada en sintaxis de puntos de una fuente de metadatos. La raíz de esta ruta debe ser "stream". Puedes query los siguientes caminos:
ruta | Tipo | Descripción |
|---|---|---|
| Objeto | |
| Objeto | Todos los metadatos de la etapa |
| string | Tipo de conexión utilizada como fuente. |
| ISODate | Fecha y hora del registro en el momento de la ingestión. |
| string | Tema de Kafka del cual el flujo ingiere registros. Disponible sólo desde una fuente Kafka. |
| entero | Partición del tema de Kafka desde el que la secuencia ingiere registros. Disponible solo desde una fuente de Kafka. |
| entero | Orden de mensajes de seguimiento de desplazamiento y posición en cola dentro de una partición de origen de Kafka. Disponible solo desde una fuente de Kafka. |
| string|int|long|double|objeto|binData | Clave asignada a mensajes de Kafka para particionamiento y distribución de la carga. Disponible solo desde una fuente de Kafka. |
| arreglo | Conjunto de pares clave-valor que describen los metadatos del mensaje de Kafka. |
| Objeto | Todos los metadatos para la etapa |
| ISODate | Hora de inicio de la ventana actual. |
| ISODate | Hora de finalización de la ventana actual. |
| Objeto | |
| string | URL desde la que el escenario |
| string | Método de solicitud HTTPS enviado a la URL. |
| Int | Código de respuesta HTTP para la solicitud enviada a la URL. |
| Int | Tiempo, en milisegundos, que tardó en recibir la respuesta de la URL. |
Comportamiento
La expresión Atlas Stream Processing $meta proporciona todas las funcionalidades del actual MongoDB $meta Expresión de agregación. Sin embargo, no se puede usar la funcionalidad específica de la versión Atlas Stream Processing de $meta en una consulta de agregación estándar de MongoDB.
Ejemplos
El siguiente ejemplo enriquece la salida de un flujo con un arreglo de los temas fuente de Kafka desde los cuales se ingresaron los datos:
{ $source: { connectionName: "kafka", topic: ["t1", "t2", "t3"] } }, { $emit: { connectionName: "kafka", topic: { $concat: [ { $meta: "stream.source.topic" }, "out" ] } } }
El siguiente ejemplo añade un campo al stream que indica la hora de inicio de cada ventana.
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }