Docs Menu
Docs Home
/ /
/ / /

$setStreamMeta 集計ステージ(ストリーム プロセシング)

$setStreamMeta

$setStreamMeta ステージは、受信ドキュメントのメタデータフィールドを設定します。このメタデータを使用すると、ドキュメント自体のコンテンツは変更せずに、特定の 値に基づいてドキュメントに対して選択的な操作を実行できます。

$setStreamMeta ステージのプロトタイプ形式は次のとおりです。

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

$setStreamMetaステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

<metadata-field>

必須

ドキュメントに適用するメタデータフィールド。キーと値の両方を設定する必要があります。

  • キーは stream. で始まる string である必要があります。

  • 値は式 である必要があります。

式の評価が失敗した場合、Atlas Stream Processing はドキュメントを DLQ に送信します。

詳細については、 例 を参照してください。

$setStreamMetaは、$source ステージの後、$emit または $merge ステージの前に配置する必要があります。

ストリーミングデータソースは、 eコマースプラットフォームで行われた注文ごとにドキュメントを生成します。ドキュメントは次の形式をとります。

{
orderId: 1,
productId: "A",
qty: 2
}

Atlasデータベースには、次の形式のドキュメントを含む productsコレクションが含まれています。

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

次の集計には 6 つのステージがあります。

  1. ステージは、$source eコマースプラットフォームから注文ドキュメントを取り込み、取り込まれる各レコードを後続の集計ステージに公開します。

  2. $lookup productsshop_idステージは、受信注文ドキュメントを フィールドの データベース内の コレクションに結合します。結果として得られる配列フィールドはproduct と呼ばれます。

  3. $unwindproductステージは、 の単一要素の配列値をドキュメントに変換します。

  4. $setStreamMetaステージでは、 というメタデータフィールドをstream.coll product.categoryの値と等しくなるように設定します。

  5. $unsetステージではproduct フィールドが削除され、ソースドキュメントが元の形式に戻ります。

  6. $merge ステージは、ドキュメントをstream.coll の値によって動的に決定されるコレクションにマージします。

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

戻る

$sessionWindow

項目一覧