Definição
A expressão $meta
retorna um objeto que contém todos os metadados de streaming de um documento. Você pode expor esses dados para todo o fluxo ou para um dos seguintes estágios de agregação do Atlas Stream Processing:
Uma expressão $meta
tem o seguinte formato de protótipo:
{ "$meta": <string> }
{ source: { type: string, ts: date, source.topic: string source.partition: int source.offset: int source.key: string|int|long|double|object|binData source.headers: array[obj] }, window: { start: date, end: date }, https: { url: string method: string httpStatusCode: int responseTimeMs: int } }
Sintaxe
A expressão $meta
recebe uma única entrada de string que corresponde ao caminho totalmente qualificado em sintaxe de pontos de uma fonte de metadados. A raiz deste caminho deve ser "stream"
. Você pode query os seguintes caminhos:
Caminho | Tipo | Descrição |
---|---|---|
| objeto | |
| objeto | Todos os metadados para o estágio |
| string | Tipo de conexão usada como origem. |
| Data ISO | Data e hora do registro no ponto de ingestão. |
| string | Tópico do Kafka do qual o fluxo ingere registros. Disponível apenas a partir de uma fonte Kafka. |
| inteiro | Partição do tópico do Kafka de onde o fluxo ingere registros. Disponível apenas a partir de uma fonte Kafka. |
| inteiro | Rastreamento de deslocamento da ordem das mensagens e posição na fila dentro de uma partição de origem do Kafka. Disponível apenas a partir de uma fonte Kafka. |
| string|int|long|double|objeto|binData | Chave atribuída às mensagens do Kafka para particionamento e distribuição de carga. Disponível apenas a partir de uma fonte Kafka. |
| array | Conjunto de pares chave-valor que descrevem os metadados das mensagens do Kafka. |
| objeto | Todos os metadados para o estágio |
| Data ISO | Hora de início da janela atual. |
| Data ISO | Hora de término da janela atual. |
| objeto | |
| string | URL do qual o estágio |
| string | Método de solicitação HTTPS enviado para o URL. |
| int | Código de resposta HTTP para a solicitação enviada ao URL. |
| int | Tempo, em milissegundos, que levou para receber a resposta da URL. |
Comportamento
A expressão Atlas Stream Processing $meta
oferece toda a funcionalidade da expressão de agregação existente do MongoDB $meta
. No entanto, não é possível utilizar a funcionalidade específica da versão do Atlas Stream Processing de $meta
em uma consulta de agregação padrão do MongoDB.
Exemplos
O exemplo a seguir enriquece a saída de um fluxo com um array dos tópicos de origem do Kafka dos quais os dados foram ingeridos:
{ $source: { connectionName: “kafka”, topic: [“t1”, “t2”, “t3”] } }, { $emit: { connectionName: “kafka”, topic: { $concat: [ { $meta: “stream.source.topic” }, “out" ] } } }
O exemplo a seguir adiciona um campo ao fluxo que relata o relatório da hora de início de cada janela.
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }