Definition
The $meta expression returns an object containing all streaming metadata for a document. You can expose this data for either the entire stream, or one of the following Atlas Stream Processing aggregation stages:
A $meta expression has the following prototype form:
{ "$meta": <string> }
"source": { "type": "<source-type>", "ts": { "$date": "<datetime>" }, "topic": "<string>", "partition": <int>, "offset": <int>, "key": "<kafka-key>", "headers": [ { "k": "<header-key>", "v": "<header-value>" } ], "operationType": "<db-operation>", "ns": { "db": "<namespace-db>", "coll": "<namespace-coll>" }, "documentKey": { "_id": { "$oid": "<object-id>" } }, "initialSync": { "phase": "<sync-state>" } "kinesisStream": "<kinesis-name>", "shardId": "<kinesis-shard-id>", "sequenceNumber": "<doc-uuid>", "partitionKey": "<partition-id>", } "window": { "start": <ISODate>, "end": <ISODate>, "partition": "<session-partition>" }, "https": { "url": "<target-url>", "method": "<request-method>", "httpStatusCode": <http-code>, "responseTimeMs": <response-time-ms> }
Syntax
The $meta expression takes a single string input corresponding to the fully qualified, dot-syntax path of a source of metadata. The root of this path must be "stream". You can query the following paths:
Path | Type | Conditionality | Description |
|---|---|---|---|
| object | Always | |
| document | Always | Document containing metadata for the |
| string | Always | Type of connection used as a source. |
| ISODate | Always | Date and time of the record at the point of ingestion. |
| string | Conditional | Kafka topic from which the stream ingests records. Applies only to a Kafka source. |
| integer | Conditional | Partition of the Kafka topic from which the stream ingests records. Applies only to a Kafka source. |
| integer | Conditional | Offset tracking message order and queue position within a Kafka source partition. Applies only to a Kafka source. |
| string | int | long | double | object | binData | Conditional | Key assigned to Kafka messages for partitioning and load distribution. Applies only to a Kafka source. |
| array | Conditional | Set of key-value pairs describin Kafka message metadata. Applies only to a Kafka source. |
| string | Conditional | Type of database operation which Atlas Stream Processing attempted to perform against the given document. Applies only to an Atlas change stream source. |
| document | Conditional | Document containing the namespace from which Atlas Stream Processing sources documents. Applies only to an Atlas change stream source. |
| string | Conditional | Name of the database against which Atlas Stream Processing attempts operations. Applies only to an Atlas change stream source. This value is the same for all documents for a Collection Change Stream or Database Change Stream source. Varies for a Cluster Change Stream source. |
| string | Conditional | Name of the collection against which Atlas Stream Processing attempts operations. Applies only to an Atlas change stream source. This value is the same for all documents for a Collection Change Stream source. Varies for a Database Change Stream or Cluster Change Stream source. |
| document | Conditional | Document containing the Object ID of the source document. Applies only to an Atlas change stream source. |
| string | Conditional | Current state of the initial sync operation. Applies only to an Atlas change stream source during initial sync. |
| string | Conditional | Name of the Kinesis Data Stream from which Atlas Stream Processing sources doucments. Applies only to an AWS Kinesis source. |
| string | Conditional | ID of the shard within the Kinesis Data Stream from which Atlas Stream Processing sources documents. Applies only to an AWS Kinesis source. |
| string | Conditional | Unique identifier of the document sourced from the Kinesis Data Stream. Applies only to an AWS Kinesis source. |
| string | Conditional | Unique identifier of the partition to which the source document belongs. Applies only to an AWS Kinesis source. |
| document | Conditional | Document containing window metadata. Applies only if the document was processed in a window. |
| ISODate | Conditional | Window open time. Applies only if the document was processed in a window. |
| ISODate | Conditional | Window close time. Applies only if the document was processed in a window. |
| string | Conditional | Session window partition to which the document belongs. Applies only if the document was processed in a session window. |
| document | Conditional | Document containing metadata for the $https stage. Applies only when the processing failure occurred in the |
| string | Conditional | Target URL of the |
| string | Conditional | HTTP request method used by the |
| int | Conditional | HTTP response status code of the request. Applies only when the processing failure occurred in the |
| int | Conditional | Response time of the request in milliseconds. Applies only when the processing failure occurred in the |
Behavior
The Atlas Stream Processing $meta expression provides all of the functionality of the existing MongoDB $meta aggregation expression. However, you can't use the functionality specific to the Atlas Stream Processing version of $meta in a standard MongoDB aggregation query.
Examples
The following example enriches the output of a stream with an array of the Kafka source topics from which the data was ingested:
{ $source: { connectionName: "kafka", topic: ["t1", "t2", "t3"] } }, { $emit: { connectionName: "kafka", topic: { $concat: [ { $meta: "stream.source.topic" }, "out" ] } } }
The following example adds a field to the stream reporting the start time of each window.
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }