For AI agents: a documentation index is available at https://www.mongodb.com/docs/llms.txt — markdown versions of all pages are available by appending .md to any URL path.
Docs Menu

$meta Aggregation Stage (Stream Processing)

The $meta expression returns an object containing all streaming metadata for a document. You can expose this data for either the entire stream, or one of the following Atlas Stream Processing aggregation stages:

A $meta expression has the following prototype form:

{ "$meta": <string> }
"source": {
"type": "<source-type>",
"ts": {
"$date": "<datetime>"
},
"topic": "<string>",
"partition": <int>,
"offset": <int>,
"key": "<kafka-key>",
"headers": [
{
"k": "<header-key>",
"v": "<header-value>"
}
],
"operationType": "<db-operation>",
"ns": {
"db": "<namespace-db>",
"coll": "<namespace-coll>"
},
"documentKey": {
"_id": {
"$oid": "<object-id>"
}
},
"initialSync": {
"phase": "<sync-state>"
}
"kinesisStream": "<kinesis-name>",
"shardId": "<kinesis-shard-id>",
"sequenceNumber": "<doc-uuid>",
"partitionKey": "<partition-id>",
}
"window": {
"start": <ISODate>,
"end": <ISODate>,
"partition": "<session-partition>"
},
"https": {
"url": "<target-url>",
"method": "<request-method>",
"httpStatusCode": <http-code>,
"responseTimeMs": <response-time-ms>
}

The $meta expression takes a single string input corresponding to the fully qualified, dot-syntax path of a source of metadata. The root of this path must be "stream". You can query the following paths:

Path
Type
Conditionality
Description

stream

object

Always

All metadata for the $source stage and any window stage or $https stage configured in the pipeline.

stream.source

document

Always

Document containing metadata for the $source stage.

stream.source.type

string

Always

Type of connection used as a source.

stream.source.ts

ISODate

Always

Date and time of the record at the point of ingestion.

stream.source.topic

string

Conditional

Kafka topic from which the stream ingests records. Applies only to a Kafka source.

stream.source.partition

integer

Conditional

Partition of the Kafka topic from which the stream ingests records. Applies only to a Kafka source.

stream.source.offset

integer

Conditional

Offset tracking message order and queue position within a Kafka source partition. Applies only to a Kafka source.

stream.source.key

string | int | long | double | object | binData

Conditional

Key assigned to Kafka messages for partitioning and load distribution. Applies only to a Kafka source.

stream.source.headers

array

Conditional

Set of key-value pairs describin Kafka message metadata. Applies only to a Kafka source.

stream.source.operationType

string

Conditional

Type of database operation which Atlas Stream Processing attempted to perform against the given document. Applies only to an Atlas change stream source.

stream.source.ns

document

Conditional

Document containing the namespace from which Atlas Stream Processing sources documents. Applies only to an Atlas change stream source.

stream.source.ns.db

string

Conditional

Name of the database against which Atlas Stream Processing attempts operations. Applies only to an Atlas change stream source.

This value is the same for all documents for a Collection Change Stream or Database Change Stream source. Varies for a Cluster Change Stream source.

stream.source.ns.coll

string

Conditional

Name of the collection against which Atlas Stream Processing attempts operations. Applies only to an Atlas change stream source.

This value is the same for all documents for a Collection Change Stream source. Varies for a Database Change Stream or Cluster Change Stream source.

stream.source.documentKey._id

document

Conditional

Document containing the Object ID of the source document. Applies only to an Atlas change stream source.

stream.source.initialSync.phase

string

Conditional

Current state of the initial sync operation. Applies only to an Atlas change stream source during initial sync.

stream.source.kinesisStream

string

Conditional

Name of the Kinesis Data Stream from which Atlas Stream Processing sources doucments. Applies only to an AWS Kinesis source.

stream.source.shardId

string

Conditional

ID of the shard within the Kinesis Data Stream from which Atlas Stream Processing sources documents. Applies only to an AWS Kinesis source.

stream.source.sequenceNumber

string

Conditional

Unique identifier of the document sourced from the Kinesis Data Stream. Applies only to an AWS Kinesis source.

stream.source.partitionKey

string

Conditional

Unique identifier of the partition to which the source document belongs. Applies only to an AWS Kinesis source.

stream.window

document

Conditional

Document containing window metadata. Applies only if the document was processed in a window.

stream.window.start

ISODate

Conditional

Window open time. Applies only if the document was processed in a window.

stream.window.end

ISODate

Conditional

Window close time. Applies only if the document was processed in a window.

stream.window.partition

string

Conditional

Session window partition to which the document belongs. Applies only if the document was processed in a session window.

stream.https

document

Conditional

Document containing metadata for the $https stage. Applies only when the processing failure occurred in the $https stage.

stream.https.url

string

Conditional

Target URL of the $https stage. Applies only when the processing failure occurred in the $https stage.

stream.https.method

string

Conditional

HTTP request method used by the $https stage. Applies only when the processing failure occurred in the $https stage.

stream.https.httpStatusCode

int

Conditional

HTTP response status code of the request. Applies only when the processing failure occurred in the $https stage.

stream.https.responseTimeMs

int

Conditional

Response time of the request in milliseconds. Applies only when the processing failure occurred in the $https stage.

The Atlas Stream Processing $meta expression provides all of the functionality of the existing MongoDB $meta aggregation expression. However, you can't use the functionality specific to the Atlas Stream Processing version of $meta in a standard MongoDB aggregation query.

The following example enriches the output of a stream with an array of the Kafka source topics from which the data was ingested:

{
$source: {
connectionName: "kafka",
topic: ["t1", "t2", "t3"]
}
},
{
$emit: {
connectionName: "kafka",
topic: {
$concat: [
{
$meta: "stream.source.topic"
},
"out"
]
}
}
}

The following example adds a field to the stream reporting the start time of each window.

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}