Definição
A expressão $meta retorna um objeto que contém todos os metadados de streaming de um documento. Você pode expor esses dados para todo o fluxo ou para um dos seguintes estágios de agregação do Atlas Stream Processing:
Uma expressão $meta tem o seguinte formato de protótipo:
{ "$meta": <string> }
"source": { "type": "<source-type>", "ts": { "$date": "<datetime>" }, "topic": "<string>", "partition": <int>, "offset": <int>, "key": "<kafka-key>", "headers": [ { "k": "<header-key>", "v": "<header-value>" } ], "operationType": "<db-operation>", "ns": { "db": "<namespace-db>", "coll": "<namespace-coll>" }, "documentKey": { "_id": { "$oid": "<object-id>" } }, "initialSync": { "phase": "<sync-state>" } "kinesisStream": "<kinesis-name>", "shardId": "<kinesis-shard-id>", "sequenceNumber": "<doc-uuid>", "partitionKey": "<partition-id>", } "window": { "start": <ISODate>, "end": <ISODate>, "partition": "<session-partition>" }, "https": { "url": "<target-url>", "method": "<request-method>", "httpStatusCode": <http-code>, "responseTimeMs": <response-time-ms> }
Sintaxe
A expressão $meta recebe uma única entrada de string que corresponde ao caminho totalmente qualificado em sintaxe de pontos de uma fonte de metadados. A raiz deste caminho deve ser "stream". Você pode query os seguintes caminhos:
Caminho | Tipo | Condicionalidade | Descrição |
|---|---|---|---|
| objeto | Sempre | |
| documento | Sempre | Documento contendo metadados para o |
| string | Sempre | Tipo de conexão usada como origem. |
| Data ISO | Sempre | Data e hora do registro no ponto de ingestão. |
| string | Condicional | Tópico Kafka do qual o stream ingere registros. Aplica-se apenas a uma fonte Kafka. |
| inteiro | Condicional | Partição do tópico Kafka do qual o stream ingere registros. Aplica-se apenas a uma fonte Kafka. |
| inteiro | Condicional | Deslocar a ordem das mensagens de rastreamento e a posição da fila em uma partição de origem Kafka. Aplica-se apenas a uma fonte Kafka. |
| corda | int | longo | duplo | objeto | binData | Condicional | Chave atribuída às mensagens Kafka para particionamento e distribuição de carga. Aplica-se apenas a uma fonte Kafka. |
| array | Condicional | Conjunto de pares de valores-chave que descrevem os metadados da mensagem Kafka. Aplica-se apenas a uma fonte Kafka. |
| string | Condicional | Tipo de operação de banco de dados que o Atlas Stream Processing tentou executar no documento fornecido. Aplica-se apenas a uma origem de change stream do Atlas . |
| documento | Condicional | Documento que contém o namespace do qual o Atlas Stream Processing origina os documentos. Aplica-se apenas a uma origem de change stream do Atlas . |
| string | Condicional | Nome do banco de dados no qual o Atlas Stream Processing tenta operações. Aplica-se apenas a uma origem de change stream do Atlas . Esse valor é o mesmo para todos os documentos de uma origem de Collection Change Streams ou Database Change Streams. Varia para uma origem do Cluster Change Stream. |
| string | Condicional | Nome da collection na qual o Atlas Stream Processing tenta fazer operações. Aplica-se apenas a uma origem de change stream do Atlas . Esse valor é o mesmo para todos os documentos de uma fonte de change stream. Varia para uma origem de change stream de banco de dados ou de change stream de cluster. |
| documento | Condicional | Documento contendo o ID do objeto do documento de origem. Aplica-se apenas a uma origem de change stream do Atlas . |
| string | Condicional | Estado atual da operação de sincronização inicial . Aplica-se somente a uma fonte de change stream do Atlas durante a sincronização inicial. |
| string | Condicional | Nome do Kinesis Data Stream do qual o Atlas Stream Processing fornece documentos. Aplica-se somente a uma origem AWS Kinesis. |
| string | Condicional | ID do shard dentro do Kinesis Data Stream do qual o Atlas Stream Processing obtém documentos. Aplica-se somente a uma origem AWS Kinesis. |
| string | Condicional | Identificador exclusivo do documento obtido no Kinesis Data Stream. Aplica-se somente a uma origem AWS Kinesis. |
| string | Condicional | Identificador único da partição à qual pertence o documento de origem. Aplica-se somente a uma origem AWS Kinesis. |
| documento | Condicional | Documento contendo metadados de janela. Aplica-se somente se o documento foi processado em uma janela. |
| Data ISO | Condicional | Tempo de janela aberta. Aplica-se somente se o documento foi processado em uma janela. |
| Data ISO | Condicional | Tempo de fechamento da janela. Aplica-se somente se o documento foi processado em uma janela. |
| string | Condicional | Partição da janela de sessão à qual o documento pertence. Aplica-se somente se o documento foi processado em uma janela de sessão. |
| documento | Condicional | |
| string | Condicional | URL de destino do estágio |
| string | Condicional | Método de solicitação HTTP usado pelo estágio |
| int | Condicional | Código de status da resposta HTTP da solicitação. Aplica-se somente quando a falha de processamento ocorreu no estágio |
| int | Condicional | Tempo de resposta da solicitação em milissegundos. Aplica-se somente quando a falha de processamento ocorreu no estágio |
Comportamento
A expressão $meta do Atlas Stream Processing fornece toda a funcionalidade da expressão de agregação existente do MongoDB $meta. No entanto, você não pode usar a funcionalidade específica para a versão do Atlas Stream Processing do $meta em uma consulta de agregação MongoDB padrão.
Exemplos
O exemplo a seguir enriquece a saída de um fluxo com um array dos tópicos de origem do Kafka dos quais os dados foram ingeridos:
{ $source: { connectionName: "kafka", topic: ["t1", "t2", "t3"] } }, { $emit: { connectionName: "kafka", topic: { $concat: [ { $meta: "stream.source.topic" }, "out" ] } } }
O exemplo a seguir adiciona um campo ao fluxo que relata o relatório da hora de início de cada janela.
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }