Definición
La etapa $setStreamMeta define los campos de metadatos para los documentos entrantes. Puede usar estos metadatos para realizar operaciones selectivas en los documentos según valores específicos sin alterar su contenido.
Sintaxis
Una etapa $setStreamMeta tiene la siguiente forma de prototipo:
{ $setStreamMeta: { "<metadata-field>": <expression> } }
La etapa $setStreamMeta procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| expresión | Requerido | Campo de metadatos para aplicar a los documentos. Debes configurar tanto la clave como el valor:
Si falla la evaluación de la expresión, Atlas Stream Processing envía el documento a la DLQ. Para obtener más información, consulte los ejemplos. |
Comportamiento
$setStreamMeta debe venir después de tu $source etapa y antes de tu etapa $emit o $merge.
Ejemplos
Una fuente de datos de streaming genera un documento por cada pedido realizado en una plataforma de comercio electrónico. Los documentos tienen el siguiente formato:
{ orderId: 1, productId: "A", qty: 2 }
Una base de datos Atlas contiene una colección products con documentos del siguiente formato:
[ {_id: "A", name: "Laptop", category: "tech"}, {_id: "B", name: "Shirt", category: "clothing"} ]
La siguiente agregación tiene seis etapas:
La etapa ingiere documentos de pedidos desde la plataforma de comercio electrónico, exponiendo cada registro a medida que se ingiere a las etapas de agregación
$sourceposteriores.La etapa $lookup une los documentos del pedido entrante a una
productscolección en lashopbase de datos, en el_idcampo. El campo de matriz resultante seproductdenomina.La etapa convierte el valor de la matriz de un solo elemento
$unwinddeproducten un documento.La etapa establece un campo de metadatos
$setStreamMetallamadostream.colligual al valorproduct.categoryde.La etapa elimina
$unsetelproductcampo y devuelve el documento fuente a su forma original.La etapa $merge fusiona el documento en una colección determinada dinámicamente por el valor
stream.collde.
{ $source: { documents: [ { orderId: 1, productId: "A", qty: 2 }, { orderId: 2, productId: "B", qty: 1 }, { orderId: 3, productId: "A", qty: 5 } ] } }, { $lookup: { from: { connectionName: "atlas", db: "shop", coll: "products" }, localField: "productId", foreignField: "_id", as: "product" } }, { $unwind: "$product" }, { $setStreamMeta: { "stream.coll": "$product.category" } }, { $unset: [ "product" ] }, { $merge: { into: { connectionName: "atlas", db: "shop", coll: { $meta: "stream.coll" } } } }