Definición
La fase $setStreamMeta establece campos de metadatos para los documentos entrantes. Puedes utilizar estos metadatos para realizar operaciones selectivas en documentos basados en valores específicos sin alterar el contenido de los propios documentos.
Sintaxis
Un $setStreamMeta etapa tiene la siguiente forma prototipo:
{ $setStreamMeta: { "<metadata-field>": <expression> } }
La etapa $setStreamMeta procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| expresión | Requerido | Campo de metadatos que se aplicará a los documentos. Debes establecer tanto la clave como el valor:
Si no se puede evaluar la expresión, Atlas Stream Processing envía el documento al DLQ. Para obtener más información, consulta los Ejemplos. |
Comportamiento
$setStreamMeta debe venir después de tu $source etapa y antes de tu etapa $emit o $merge.
Ejemplos
Una fuente de datos de transmisión genera un documento por cada pedido realizado en una plataforma de comercio electrónico. Los documentos adoptan la siguiente forma:
{ orderId: 1, productId: "A", qty: 2 }
Una base de datos de Atlas contiene una colección products con documentos de la siguiente forma:
[ {_id: "A", name: "Laptop", category: "tech"}, {_id: "B", name: "Shirt", category: "clothing"} ]
La siguiente agregación tiene seis etapas:
La etapa
$sourcerecopila documentos de pedidos de la plataforma de comercio electrónico, exponiendo cada registro a medida que se recopila a las siguientes etapas de agregación.La etapa $lookup une los documentos de los pedidos entrantes a una colección de
productsen la base de datosshopen el campo_id. El campo de arreglo resultante se llamaproduct.La etapa
$unwindconvierte el valor del arreglo de un solo elemento deproducten un documento.La etapa establece un campo de metadatos
$setStreamMetallamadostream.colligual al valorproduct.categoryde.La etapa
$unsetremueve el campoproduct, devolviendo el documento de origen a su forma original.La etapa $merge fusiona el documento en una colección determinada dinámicamente por el valor de
stream.coll.
{ $source: { documents: [ { orderId: 1, productId: "A", qty: 2 }, { orderId: 2, productId: "B", qty: 1 }, { orderId: 3, productId: "A", qty: 5 } ] } }, { $lookup: { from: { connectionName: "atlas", db: "shop", coll: "products" }, localField: "productId", foreignField: "_id", as: "product" } }, { $unwind: "$product" }, { $setStreamMeta: { "stream.coll": "$product.category" } }, { $unset: [ "product" ] }, { $merge: { into: { connectionName: "atlas", db: "shop", coll: { $meta: "stream.coll" } } } }