Docs 菜单
Docs 主页
/ /
/ / /

$setStreamMeta 聚合阶段(流处理)

$setStreamMeta

$setStreamMeta 阶段为传入文档设置元数据字段。您可以使用此元数据根据特定值对文档执行选择性操作,而无需更改文档本身的内容。

$setStreamMeta 阶段具有以下原型形式:

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

$setStreamMeta 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

<metadata-field>

表达式(expression)

必需

要应用文档的元数据字段。您必须同时设立键和值:

  • 键必须是以 stream. 开头的字符串。

  • 该值必须是表达式。

如果表达式求值失败, Atlas Stream Processing会将文档发送到 DLQ。

要学习;了解更多信息,请参阅示例。

$setStreamMeta必须位于$source 阶段之后且 $emit $merge 阶段之前。

流媒体数据源为通过电子商务平台发出的每个订单生成一个文档。文档采用以下形式:

{
orderId: 1,
productId: "A",
qty: 2
}

Atlas数据库包含一个 products集合,其中包含以下形式的文档:

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

以下聚合有六个阶段:

  1. $source阶段从电子商务平台摄取订单文档,将摄取的每条记录公开给后续聚合阶段。

  2. $lookup 阶段将传入的订单文档联接到products shop数据库中_id 字段的 集合。生成的大量字段名为product

  3. $unwind阶段将product 的单元素大量值转换为文档。

  4. $setStreamMeta阶段将名为stream.coll 的元数据字段设置为等于product.category 的值。

  5. $unset阶段删除product 字段,将源文档恢复为其原始形式。

  6. $merge 阶段将文档合并到由stream.coll 的值动态确定的集合中。

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

后退

$sessionWindow

在此页面上