Definición
La expresión $meta devuelve un objeto que contiene todos los metadatos de transmisión de un documento. Puede exponer estos datos para toda la transmisión o para una de las siguientes etapas de agregación de Atlas Stream Processing:
Una expresión $meta tiene la siguiente forma prototipo:
{ "$meta": <string> }
{ source: { type: string, ts: date, source.topic: string source.partition: int source.offset: int source.key: string|int|long|double|object|binData source.headers: array[obj] }, window: { start: date, end: date }, https: { url: string method: string httpStatusCode: int responseTimeMs: int } }
Sintaxis
La expresión $meta toma una sola cadena de entrada que corresponde a la ruta completa, con sintaxis de punto, de una fuente de metadatos. La raíz de esta ruta debe ser "stream". Puede consultar las siguientes rutas:
Ruta de acceso | Tipo | Descripción |
|---|---|---|
| Objeto | |
| Objeto | Todos los metadatos de la |
| string | Tipo de conexión utilizada como fuente. |
| ISODate | Fecha y hora del registro en el punto de ingestión. |
| string | Tema de Kafka del que la secuencia ingiere registros. Disponible solo desde una fuente de Kafka. |
| entero | Partición del tema de Kafka desde el que la secuencia ingiere registros. Disponible solo desde una fuente de Kafka. |
| entero | Orden de mensajes de seguimiento de desplazamiento y posición en cola dentro de una partición de origen de Kafka. Disponible solo desde una fuente de Kafka. |
| cadena|int|long|double|objeto|binData | Clave asignada a los mensajes de Kafka para la partición y distribución de carga. Disponible solo desde una fuente de Kafka. |
| arreglo | Conjunto de pares clave-valor que describen los metadatos de los mensajes de Kafka. |
| Objeto | Todos los metadatos de la etapa o. Este objeto solo se |
| ISODate | Hora de inicio de la ventana actual. |
| ISODate | Hora de finalización de la ventana actual. |
| Objeto | |
| string | URL desde la cual la |
| string | Método de solicitud HTTPS enviado a la URL. |
| Int | Código de respuesta HTTP para la solicitud enviada a la URL. |
| Int | Tiempo, en milisegundos, que tardó en recibir la respuesta de la URL. |
Comportamiento
La expresión Atlas Stream Processing $meta proporciona toda la funcionalidad del MongoDB existente $meta Expresión de agregación. Sin embargo, no se puede usar la funcionalidad específica de la versión Atlas Stream Processing de $meta en una consulta de agregación estándar de MongoDB.
Ejemplos
El siguiente ejemplo enriquece la salida de una transmisión con una matriz de temas de origen de Kafka desde donde se ingirieron los datos:
{ $source: { connectionName: “kafka”, topic: [“t1”, “t2”, “t3”] } }, { $emit: { connectionName: “kafka”, topic: { $concat: [ { $meta: “stream.source.topic” }, “out" ] } } }
El siguiente ejemplo agrega un campo a la secuencia que informa la hora de inicio de cada ventana.
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }