Definición
La fase $setStreamMeta establece campos de metadatos para los documentos entrantes. Puedes utilizar estos metadatos para realizar operaciones selectivas en documentos basados en valores específicos sin alterar el contenido de los propios documentos.
Sintaxis
Un $setStreamMeta etapa tiene la siguiente forma prototipo:
{ $setStreamMeta: { "<metadata-field>": <expression> } }
La etapa $setStreamMeta procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| expresión | Requerido | Campo de metadatos para aplicar a los documentos. Debes configurar tanto la clave como el valor:
Si la evaluación de la expresión falla, Atlas Stream Processing envía el documento al DLQ. Para obtener más información, consulte los ejemplos. |
Comportamiento
$setStreamMeta debe ir después de la etapa $source y antes de la etapa $emit o $merge.
Ejemplos
Una fuente de datos de transmisión genera un documento por cada pedido realizado en una plataforma de comercio electrónico. Los documentos adoptan la siguiente forma:
{ orderId: 1, productId: "A", qty: 2 }
Una base de datos de Atlas contiene una colección products con documentos de la siguiente forma:
[ {_id: "A", name: "Laptop", category: "tech"}, {_id: "B", name: "Shirt", category: "clothing"} ]
La siguiente agregación tiene seis etapas:
La etapa
$sourcerecopila documentos de pedidos de la plataforma de comercio electrónico, exponiendo cada registro a medida que se recopila a las siguientes etapas de agregación.La etapa $lookup une los documentos del pedido entrante a una
productscolección en lashopbase de datos, en el_idcampo. El campo de matriz resultante seproductdenomina.La etapa
$unwindconvierte el valor del arreglo de un solo elemento deproducten un documento.La etapa establece un campo de metadatos
$setStreamMetallamadostream.colligual al valorproduct.categoryde.La etapa
$unsetremueve el campoproduct, devolviendo el documento de origen a su forma original.La etapa $merge fusiona el documento en una colección determinada dinámicamente por el valor de
stream.coll.
{ $source: { documents: [ { orderId: 1, productId: "A", qty: 2 }, { orderId: 2, productId: "B", qty: 1 }, { orderId: 3, productId: "A", qty: 5 } ] } }, { $lookup: { from: { connectionName: "atlas", db: "shop", coll: "products" }, localField: "productId", foreignField: "_id", as: "product" } }, { $unwind: "$product" }, { $setStreamMeta: { "stream.coll": "$product.category" } }, { $unset: [ "product" ] }, { $merge: { into: { connectionName: "atlas", db: "shop", coll: { $meta: "stream.coll" } } } }