Menu Docs
Página inicial do Docs
/
Atlas
/ /

$meta Estágio de agregação (processamento de fluxo)

A expressão $meta retorna um objeto que contém todos os metadados de streaming de um documento. Você pode expor esses dados para todo o fluxo ou para um dos seguintes estágios de agregação do Atlas Stream Processing:

  • $source

  • $hoppingWindow

  • $tumblingWindow

  • $https

Uma expressão $meta tem o seguinte formato de protótipo:

{ "$meta": <string> }
{
source: {
type: string,
ts: date,
source.topic: string
source.partition: int
source.offset: int
source.key: string|int|long|double|object|binData
source.headers: array[obj]
},
window: {
start: date,
end: date
},
https: {
url: string
method: string
httpStatusCode: int
responseTimeMs: int
}
}

A expressão $meta recebe uma única entrada de string que corresponde ao caminho totalmente qualificado em sintaxe de pontos de uma fonte de metadados. A raiz deste caminho deve ser "stream". Você pode query os seguintes caminhos:

Caminho
Tipo
Descrição

stream

objeto

Todos os metadados para o estágio $source e qualquer estágio de janela ou estágio $https configurado no pipeline.

stream.source

objeto

Todos os metadados para o estágio $source.

stream.source.type

string

Tipo de conexão usada como origem.

stream.source.ts

Data ISO

Data e hora do registro no ponto de ingestão.

stream.source.topic

string

Tópico do Kafka do qual o fluxo ingere registros. Disponível apenas a partir de uma fonte Kafka.

stream.source.partition

inteiro

Partição do tópico do Kafka de onde o fluxo ingere registros. Disponível apenas a partir de uma fonte Kafka.

stream.source.offset

inteiro

Rastreamento de deslocamento da ordem das mensagens e posição na fila dentro de uma partição de origem do Kafka. Disponível apenas a partir de uma fonte Kafka.

stream.source.key

string|int|long|double|objeto|binData

Chave atribuída às mensagens do Kafka para particionamento e distribuição de carga. Disponível apenas a partir de uma fonte Kafka.

stream.source.headers

array

Conjunto de pares chave-valor que descrevem os metadados das mensagens do Kafka.

stream.window

objeto

Todos os metadados para o estágio $hoppingWindow ou $tumblingWindow. Este objeto é configurado apenas quando um documento entra em um estágio $hoppingWindow ou $tumblingWindow.

stream.window.start

Data ISO

Hora de início da janela atual.

stream.window.end

Data ISO

Hora de término da janela atual.

stream.https

objeto

Todos os metadados para o estágio $https. Este objeto é configurado apenas quando um estágio $https produz um documento.

stream.https.url

string

URL do qual o estágio $https obtém dados.

stream.https.method

string

Método de solicitação HTTPS enviado para o URL.

stream.https.httpStatusCode

int

Código de resposta HTTP para a solicitação enviada ao URL.

stream.https.responseTimeMs

int

Tempo, em milissegundos, que levou para receber a resposta da URL.

A expressão Atlas Stream Processing $meta oferece toda a funcionalidade da expressão de agregação existente do MongoDB $meta. No entanto, não é possível utilizar a funcionalidade específica da versão do Atlas Stream Processing de $meta em uma consulta de agregação padrão do MongoDB.

O exemplo a seguir enriquece a saída de um fluxo com um array dos tópicos de origem do Kafka dos quais os dados foram ingeridos:

{
$source: {
connectionName: “kafka”,
topic: [“t1”, “t2”, “t3”]
}
},
{
$emit: {
connectionName: “kafka”,
topic: {
$concat: [
{
$meta: “stream.source.topic”
},
“out"
]
}
}
}

O exemplo a seguir adiciona um campo ao fluxo que relata o relatório da hora de início de cada janela.

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}

Voltar

$currentDate

Nesta página