Docs Menu
Docs Home
/ /
/ / /

$setStreamMeta Aggregation Stage (Stream Processing)

$setStreamMeta

The $setStreamMeta stage sets metadata fields for incoming documents. You can use this metadata to perform selective operations on documents based on specific values without altering the content of the documents themselves.

A $setStreamMeta stage has the following prototype form:

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

The $setStreamMeta stage takes a document with the following fields:

Field
Type
Necessity
Description

<metadata-field>

expression

Required

Metadata field to apply to documents. You must set both the key and value:

  • The key must be a string beginning with stream..

  • The value must be an expression.

If evaluating the expression fails, Atlas Stream Processing sends the document to the DLQ.

To learn more, see the Examples.

$setStreamMeta must come after your $source stage and before your $emit or $merge stage.

A streaming data source generates a document for each order made with an e-commerce platform. The documents take the following form:

{
orderId: 1,
productId: "A",
qty: 2
}

An Atlas database contains a products collection with documents of the following form:

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

The following aggregation has six stages:

  1. The $source stage ingests order documents from the e-commerce platform, exposing each record as it is ingeste to the subsequent aggregation stages.

  2. The $lookup stage joins the incoming order documents to a products collection in the shop database on the _id field. The resulting array field is called product.

  3. The $unwind stage turns the single-element array value of product into a document.

  4. The $setStreamMeta stage sets a metadata field called stream.coll equal to the value of product.category.

  5. The $unset stage removes the product field, returning the source document to its original form.

  6. The $merge stage merges the document into a collection dynamically determined by the value of stream.coll.

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

Back

$sessionWindow

On this page