定義
$setStreamMeta ステージは、受信ドキュメントのメタデータフィールドを設定します。このメタデータを使用すると、ドキュメント自体のコンテンツは変更せずに、特定の 値に基づいてドキュメントに対して選択的な操作を実行できます。
構文
$setStreamMeta ステージのプロトタイプ形式は次のとおりです。
{ $setStreamMeta: { "<metadata-field>": <expression> } }
$setStreamMetaステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
|---|---|---|---|
| 式 | 必須 | ドキュメントに適用するメタデータフィールド。キーと値の両方を設定する必要があります。
式の評価が失敗した場合、Atlas Stream Processing はドキュメントを DLQ に送信します。 詳細については、 例 を参照してください。 |
動作
$setStreamMetaは、$source ステージの後、$emit または $merge ステージの前に配置する必要があります。
例
ストリーミングデータソースは、 eコマースプラットフォームで行われた注文ごとにドキュメントを生成します。ドキュメントは次の形式をとります。
{ orderId: 1, productId: "A", qty: 2 }
Atlasデータベースには、次の形式のドキュメントを含む productsコレクションが含まれています。
[ {_id: "A", name: "Laptop", category: "tech"}, {_id: "B", name: "Shirt", category: "clothing"} ]
次の集計には 6 つのステージがあります。
ステージは、
$sourceeコマースプラットフォームから注文ドキュメントを取り込み、取り込まれる各レコードを後続の集計ステージに公開します。$lookup
productsshop_idステージは、受信注文ドキュメントを フィールドの データベース内の コレクションに結合します。結果として得られる配列フィールドはproductと呼ばれます。$unwindproductステージは、 の単一要素の配列値をドキュメントに変換します。$setStreamMetaステージでは、 というメタデータフィールドをstream.collproduct.categoryの値と等しくなるように設定します。$unsetステージではproductフィールドが削除され、ソースドキュメントが元の形式に戻ります。$merge ステージは、ドキュメントを
stream.collの値によって動的に決定されるコレクションにマージします。
{ $source: { documents: [ { orderId: 1, productId: "A", qty: 2 }, { orderId: 2, productId: "B", qty: 1 }, { orderId: 3, productId: "A", qty: 5 } ] } }, { $lookup: { from: { connectionName: "atlas", db: "shop", coll: "products" }, localField: "productId", foreignField: "_id", as: "product" } }, { $unwind: "$product" }, { $setStreamMeta: { "stream.coll": "$product.category" } }, { $unset: [ "product" ] }, { $merge: { into: { connectionName: "atlas", db: "shop", coll: { $meta: "stream.coll" } } } }