Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$meta Stage de agregación (Stream Processing)

La expresión $meta devuelve un objeto que contiene todos los metadatos de transmisión para un documento. Puede exponer estos datos para todo el flujo, o bien para una de las siguientes etapas de agregación de Atlas Stream Processing:

  • $source

  • $hoppingWindow

  • $tumblingWindow

  • $https

Una expresión $meta tiene la siguiente forma de prototipo:

{ "$meta": <string> }
{
source: {
type: string,
ts: date,
source.topic: string
source.partition: int
source.offset: int
source.key: string|int|long|double|object|binData
source.headers: array[obj]
},
window: {
start: date,
end: date
},
https: {
url: string
method: string
httpStatusCode: int
responseTimeMs: int
}
}

La expresión $meta toma una única string como entrada, que corresponde a la ruta totalmente calificada en sintaxis de puntos de una fuente de metadatos. La raíz de esta ruta debe ser "stream". Puedes query los siguientes caminos:

ruta
Tipo
Descripción

stream

Objeto

Todos los metadatos de la etapa $source $https y cualquier etapa de ventana o etapa configurada en la canalización.

stream.source

Objeto

Todos los metadatos de la etapa $source.

stream.source.type

string

Tipo de conexión utilizada como fuente.

stream.source.ts

ISODate

Fecha y hora del registro en el momento de la ingestión.

stream.source.topic

string

Tema de Kafka del cual el flujo ingiere registros. Disponible sólo desde una fuente Kafka.

stream.source.partition

entero

Partición del tema de Kafka desde el que la secuencia ingiere registros. Disponible solo desde una fuente de Kafka.

stream.source.offset

entero

Orden de mensajes de seguimiento de desplazamiento y posición en cola dentro de una partición de origen de Kafka. Disponible solo desde una fuente de Kafka.

stream.source.key

string|int|long|double|objeto|binData

Clave asignada a mensajes de Kafka para particionamiento y distribución de la carga. Disponible solo desde una fuente de Kafka.

stream.source.headers

arreglo

Conjunto de pares clave-valor que describen los metadatos del mensaje de Kafka.

stream.window

Objeto

Todos los metadatos para la etapa $hoppingWindow o $tumblingWindow. Este objeto se establece sólo cuando un documento entra en una $hoppingWindow o $tumblingWindow etapa.

stream.window.start

ISODate

Hora de inicio de la ventana actual.

stream.window.end

ISODate

Hora de finalización de la ventana actual.

stream.https

Objeto

Todos los metadatos de la fase de $https. Este objeto solo se establece cuando una etapa $https genera un documento.

stream.https.url

string

URL desde la que el escenario $https obtiene datos.

stream.https.method

string

Método de solicitud HTTPS enviado a la URL.

stream.https.httpStatusCode

Int

Código de respuesta HTTP para la solicitud enviada a la URL.

stream.https.responseTimeMs

Int

Tiempo, en milisegundos, que tardó en recibir la respuesta de la URL.

La expresión Atlas Stream Processing $meta proporciona todas las funcionalidades del actual MongoDB $meta Expresión de agregación. Sin embargo, no se puede usar la funcionalidad específica de la versión Atlas Stream Processing de $meta en una consulta de agregación estándar de MongoDB.

El siguiente ejemplo enriquece la salida de un flujo con un arreglo de los temas fuente de Kafka desde los cuales se ingresaron los datos:

{
$source: {
connectionName: "kafka",
topic: ["t1", "t2", "t3"]
}
},
{
$emit: {
connectionName: "kafka",
topic: {
$concat: [
{
$meta: "stream.source.topic"
},
"out"
]
}
}
}

El siguiente ejemplo añade un campo al stream que indica la hora de inicio de cada ventana.

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}

Volver

$currentDate

En esta página