Para agentes de IA: um índice de documentação está disponível em https://www.mongodb.com/pt-br/docs/llms.txt — as versões de markdown de todas as páginas estão disponíveis anexando .md a qualquer caminho de URL.
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Menu Docs

$meta Estágio de agregação (processamento de fluxo)

A expressão $meta retorna um objeto que contém todos os metadados de streaming de um documento. Você pode expor esses dados para todo o fluxo ou para um dos seguintes estágios de agregação do Atlas Stream Processing:

Uma expressão $meta tem o seguinte formato de protótipo:

{ "$meta": <string> }
"source": {
"type": "<source-type>",
"ts": {
"$date": "<datetime>"
},
"topic": "<string>",
"partition": <int>,
"offset": <int>,
"key": "<kafka-key>",
"headers": [
{
"k": "<header-key>",
"v": "<header-value>"
}
],
"operationType": "<db-operation>",
"ns": {
"db": "<namespace-db>",
"coll": "<namespace-coll>"
},
"documentKey": {
"_id": {
"$oid": "<object-id>"
}
},
"initialSync": {
"phase": "<sync-state>"
}
"kinesisStream": "<kinesis-name>",
"shardId": "<kinesis-shard-id>",
"sequenceNumber": "<doc-uuid>",
"partitionKey": "<partition-id>",
}
"window": {
"start": <ISODate>,
"end": <ISODate>,
"partition": "<session-partition>"
},
"https": {
"url": "<target-url>",
"method": "<request-method>",
"httpStatusCode": <http-code>,
"responseTimeMs": <response-time-ms>
}

A expressão $meta recebe uma única entrada de string que corresponde ao caminho totalmente qualificado em sintaxe de pontos de uma fonte de metadados. A raiz deste caminho deve ser "stream". Você pode query os seguintes caminhos:

Caminho
Tipo
Condicionalidade
Descrição

stream

objeto

Sempre

Todos os metadados para o estágio $source e qualquer estágio de janela ou estágio $https configurado no pipeline.

stream.source

documento

Sempre

Documento contendo metadados para o $source estágio.

stream.source.type

string

Sempre

Tipo de conexão usada como origem.

stream.source.ts

Data ISO

Sempre

Data e hora do registro no ponto de ingestão.

stream.source.topic

string

Condicional

Tópico Kafka do qual o stream ingere registros. Aplica-se apenas a uma fonte Kafka.

stream.source.partition

inteiro

Condicional

Partição do tópico Kafka do qual o stream ingere registros. Aplica-se apenas a uma fonte Kafka.

stream.source.offset

inteiro

Condicional

Deslocar a ordem das mensagens de rastreamento e a posição da fila em uma partição de origem Kafka. Aplica-se apenas a uma fonte Kafka.

stream.source.key

corda | int | longo | duplo | objeto | binData

Condicional

Chave atribuída às mensagens Kafka para particionamento e distribuição de carga. Aplica-se apenas a uma fonte Kafka.

stream.source.headers

array

Condicional

Conjunto de pares de valores-chave que descrevem os metadados da mensagem Kafka. Aplica-se apenas a uma fonte Kafka.

stream.source.operationType

string

Condicional

Tipo de operação de banco de dados que o Atlas Stream Processing tentou executar no documento fornecido. Aplica-se apenas a uma origem de change stream do Atlas .

stream.source.ns

documento

Condicional

Documento que contém o namespace do qual o Atlas Stream Processing origina os documentos. Aplica-se apenas a uma origem de change stream do Atlas .

stream.source.ns.db

string

Condicional

Nome do banco de dados no qual o Atlas Stream Processing tenta operações. Aplica-se apenas a uma origem de change stream do Atlas .

Esse valor é o mesmo para todos os documentos de uma origem de Collection Change Streams ou Database Change Streams. Varia para uma origem do Cluster Change Stream.

stream.source.ns.coll

string

Condicional

Nome da collection na qual o Atlas Stream Processing tenta fazer operações. Aplica-se apenas a uma origem de change stream do Atlas .

Esse valor é o mesmo para todos os documentos de uma fonte de change stream. Varia para uma origem de change stream de banco de dados ou de change stream de cluster.

stream.source.documentKey._id

documento

Condicional

Documento contendo o ID do objeto do documento de origem. Aplica-se apenas a uma origem de change stream do Atlas .

stream.source.initialSync.phase

string

Condicional

Estado atual da operação de sincronização inicial . Aplica-se somente a uma fonte de change stream do Atlas durante a sincronização inicial.

stream.source.kinesisStream

string

Condicional

Nome do Kinesis Data Stream do qual o Atlas Stream Processing fornece documentos. Aplica-se somente a uma origem AWS Kinesis.

stream.source.shardId

string

Condicional

ID do shard dentro do Kinesis Data Stream do qual o Atlas Stream Processing obtém documentos. Aplica-se somente a uma origem AWS Kinesis.

stream.source.sequenceNumber

string

Condicional

Identificador exclusivo do documento obtido no Kinesis Data Stream. Aplica-se somente a uma origem AWS Kinesis.

stream.source.partitionKey

string

Condicional

Identificador único da partição à qual pertence o documento de origem. Aplica-se somente a uma origem AWS Kinesis.

stream.window

documento

Condicional

Documento contendo metadados de janela. Aplica-se somente se o documento foi processado em uma janela.

stream.window.start

Data ISO

Condicional

Tempo de janela aberta. Aplica-se somente se o documento foi processado em uma janela.

stream.window.end

Data ISO

Condicional

Tempo de fechamento da janela. Aplica-se somente se o documento foi processado em uma janela.

stream.window.partition

string

Condicional

Partição da janela de sessão à qual o documento pertence. Aplica-se somente se o documento foi processado em uma janela de sessão.

stream.https

documento

Condicional

Documento contendo metadados para o estágio $https. Aplica-se somente quando a falha de processamento ocorreu no $https estágio.

stream.https.url

string

Condicional

URL de destino do estágio $https. Aplica-se somente quando a falha de processamento ocorreu no estágio $https.

stream.https.method

string

Condicional

Método de solicitação HTTP usado pelo estágio $https. Aplica-se somente quando a falha de processamento ocorreu no estágio $https.

stream.https.httpStatusCode

int

Condicional

Código de status da resposta HTTP da solicitação. Aplica-se somente quando a falha de processamento ocorreu no estágio $https.

stream.https.responseTimeMs

int

Condicional

Tempo de resposta da solicitação em milissegundos. Aplica-se somente quando a falha de processamento ocorreu no estágio $https.

A expressão $meta do Atlas Stream Processing fornece toda a funcionalidade da expressão de agregação existente do MongoDB $meta. No entanto, você não pode usar a funcionalidade específica para a versão do Atlas Stream Processing do $meta em uma consulta de agregação MongoDB padrão.

O exemplo a seguir enriquece a saída de um fluxo com um array dos tópicos de origem do Kafka dos quais os dados foram ingeridos:

{
$source: {
connectionName: "kafka",
topic: ["t1", "t2", "t3"]
}
},
{
$emit: {
connectionName: "kafka",
topic: {
$concat: [
{
$meta: "stream.source.topic"
},
"out"
]
}
}
}

O exemplo a seguir adiciona um campo ao fluxo que relata o relatório da hora de início de cada janela.

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}