Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs 菜单
Docs 主页
/ /
/ / /

$setStreamMeta 聚合阶段(流处理)

$setStreamMeta

$setStreamMeta 阶段为传入文档设置元数据字段。您可以使用此元数据根据特定值对文档执行选择性操作,而无需更改文档本身的内容。

$setStreamMeta 阶段具有以下原型形式:

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

$setStreamMeta 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

<metadata-field>

表达式(expression)

必需

要应用文档的元数据字段。您必须同时设立键和值:

  • 键必须是以 stream. 开头的字符串。

  • 该值必须是表达式。

如果表达式求值失败, Atlas Stream Processing会将文档发送到 DLQ。

要学习;了解更多信息,请参阅示例。

$setStreamMeta 必须位于 $source 阶段之后且 $emit$merge 阶段之前。

流媒体数据源为通过电子商务平台发出的每个订单生成一个文档。文档采用以下形式:

{
orderId: 1,
productId: "A",
qty: 2
}

Atlas数据库包含一个 products集合,其中包含以下形式的文档:

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

以下聚合有六个阶段:

  1. $source 阶段从电商平台摄取订单文档,将摄取的每条记录公开给后续聚合阶段。

  2. $lookup 阶段将传入的订单文档联接到 shop数据库中 _id字段的 products集合。生成的数组字段名为 product

  3. $unwind 阶段将 product 的单元素数组值转换为文档。

  4. $setStreamMeta 阶段将名为 stream.coll 的元数据字段设置为等于 product.category 的值。

  5. $unset 阶段删除 product字段,将源文档恢复为其原始形式。

  6. $merge 阶段将文档合并到由 stream.coll 的值动态确定的集合中。

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

后退

$sessionWindow

在此页面上