Definition
The $setStreamMeta stage sets metadata fields for incoming
documents. You can use this metadata to perform selective operations
on documents based on specific values without altering the content of
the documents themselves.
Syntax
A $setStreamMeta stage has the following prototype form:
{ $setStreamMeta: { "<metadata-field>": <expression> } }
The $setStreamMeta stage takes a document with the following
fields:
Field | Type | Necessity | Description |
|---|---|---|---|
| expression | Required |
Behavior
$setStreamMeta must come after your $source
stage and before your $emit or $merge stage.
Examples
A streaming data source generates a document for each order made with an e-commerce platform. The documents take the following form:
{ orderId: 1, productId: "A", qty: 2 }
An Atlas database contains a products collection with
documents of the following form:
[ {_id: "A", name: "Laptop", category: "tech"}, {_id: "B", name: "Shirt", category: "clothing"} ]
The following aggregation has six stages:
The
$sourcestage ingests order documents from the e-commerce platform, exposing each record as it is ingeste to the subsequent aggregation stages.The $lookup stage joins the incoming order documents to a
productscollection in theshopdatabase on the_idfield. The resulting array field is calledproduct.The
$unwindstage turns the single-element array value ofproductinto a document.The
$setStreamMetastage sets a metadata field calledstream.collequal to the value ofproduct.category.The
$unsetstage removes theproductfield, returning the source document to its original form.The $merge stage merges the document into a collection dynamically determined by the value of
stream.coll.
{ $source: { documents: [ { orderId: 1, productId: "A", qty: 2 }, { orderId: 2, productId: "B", qty: 1 }, { orderId: 3, productId: "A", qty: 5 } ] } }, { $lookup: { from: { connectionName: "atlas", db: "shop", coll: "products" }, localField: "productId", foreignField: "_id", as: "product" } }, { $unwind: "$product" }, { $setStreamMeta: { "stream.coll": "$product.category" } }, { $unset: [ "product" ] }, { $merge: { into: { connectionName: "atlas", db: "shop", coll: { $meta: "stream.coll" } } } }