Docs Menu
Docs Home
/ /
/ / /

$meta Stage de agregación (Stream Processing)

La expresión $meta devuelve un objeto que contiene todos los metadatos de transmisión de un documento. Puede exponer estos datos para toda la transmisión o para una de las siguientes etapas de agregación de Atlas Stream Processing:

  • $source

  • $hoppingWindow

  • $tumblingWindow

  • $https

Una expresión $meta tiene la siguiente forma prototipo:

{ "$meta": <string> }
{
source: {
type: string,
ts: date,
source.topic: string
source.partition: int
source.offset: int
source.key: string|int|long|double|object|binData
source.headers: array[obj]
},
window: {
start: date,
end: date
},
https: {
url: string
method: string
httpStatusCode: int
responseTimeMs: int
}
}

La expresión $meta toma una sola cadena de entrada que corresponde a la ruta completa, con sintaxis de punto, de una fuente de metadatos. La raíz de esta ruta debe ser "stream". Puede consultar las siguientes rutas:

Ruta de acceso
Tipo
Descripción

stream

Objeto

Todos los metadatos de la etapa $source $https y cualquier etapa de ventana o etapa configurada en la canalización.

stream.source

Objeto

Todos los metadatos de la $source etapa.

stream.source.type

string

Tipo de conexión utilizada como fuente.

stream.source.ts

ISODate

Fecha y hora del registro en el punto de ingestión.

stream.source.topic

string

Tema de Kafka del que la secuencia ingiere registros. Disponible solo desde una fuente de Kafka.

stream.source.partition

entero

Partición del tema de Kafka desde el que la secuencia ingiere registros. Disponible solo desde una fuente de Kafka.

stream.source.offset

entero

Orden de mensajes de seguimiento de desplazamiento y posición en cola dentro de una partición de origen de Kafka. Disponible solo desde una fuente de Kafka.

stream.source.key

cadena|int|long|double|objeto|binData

Clave asignada a los mensajes de Kafka para la partición y distribución de carga. Disponible solo desde una fuente de Kafka.

stream.source.headers

arreglo

Conjunto de pares clave-valor que describen los metadatos de los mensajes de Kafka.

stream.window

Objeto

Todos los metadatos de la etapa o. Este objeto solo se $hoppingWindow establece cuando un documento entra en $tumblingWindow $hoppingWindow $tumblingWindow la etapa o.

stream.window.start

ISODate

Hora de inicio de la ventana actual.

stream.window.end

ISODate

Hora de finalización de la ventana actual.

stream.https

Objeto

Todos los metadatos de la etapa. Este objeto $https $https solo se establece cuando una etapa genera un documento.

stream.https.url

string

URL desde la cual la $https etapa obtiene los datos.

stream.https.method

string

Método de solicitud HTTPS enviado a la URL.

stream.https.httpStatusCode

Int

Código de respuesta HTTP para la solicitud enviada a la URL.

stream.https.responseTimeMs

Int

Tiempo, en milisegundos, que tardó en recibir la respuesta de la URL.

La expresión Atlas Stream Processing $meta proporciona toda la funcionalidad del MongoDB existente $meta Expresión de agregación. Sin embargo, no se puede usar la funcionalidad específica de la versión Atlas Stream Processing de $meta en una consulta de agregación estándar de MongoDB.

El siguiente ejemplo enriquece la salida de una transmisión con una matriz de temas de origen de Kafka desde donde se ingirieron los datos:

{
$source: {
connectionName: “kafka”,
topic: [“t1”, “t2”, “t3”]
}
},
{
$emit: {
connectionName: “kafka”,
topic: {
$concat: [
{
$meta: “stream.source.topic”
},
“out"
]
}
}
}

El siguiente ejemplo agrega un campo a la secuencia que informa la hora de inicio de cada ventana.

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}

Volver

$currentDate

En esta página