Docs Menu
Docs Home
/ /
/ / /

$setStreamMeta Stage de agregación (Stream Processing)

$setStreamMeta

La etapa $setStreamMeta define los campos de metadatos para los documentos entrantes. Puede usar estos metadatos para realizar operaciones selectivas en los documentos según valores específicos sin alterar su contenido.

Una etapa $setStreamMeta tiene la siguiente forma de prototipo:

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

La etapa $setStreamMeta procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

<metadata-field>

expresión

Requerido

Campo de metadatos para aplicar a los documentos. Debes configurar tanto la clave como el valor:

  • La clave debe ser una cadena que comience con stream..

  • El valor debe ser una expresión.

Si falla la evaluación de la expresión, Atlas Stream Processing envía el documento a la DLQ.

Para obtener más información, consulte los ejemplos.

$setStreamMeta debe venir después de tu $source etapa y antes de tu etapa $emit o $merge.

Una fuente de datos de streaming genera un documento por cada pedido realizado en una plataforma de comercio electrónico. Los documentos tienen el siguiente formato:

{
orderId: 1,
productId: "A",
qty: 2
}

Una base de datos Atlas contiene una colección products con documentos del siguiente formato:

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

La siguiente agregación tiene seis etapas:

  1. La etapa ingiere documentos de pedidos desde la plataforma de comercio electrónico, exponiendo cada registro a medida que se ingiere a las etapas de agregación $source posteriores.

  2. La etapa $lookup une los documentos del pedido entrante a una products colección en la shop base de datos, en el _id campo. El campo de matriz resultante se product denomina.

  3. La etapa convierte el valor de la matriz de un solo elemento $unwind de product en un documento.

  4. La etapa establece un campo de metadatos $setStreamMeta llamado stream.coll igual al valor product.category de.

  5. La etapa elimina $unset el product campo y devuelve el documento fuente a su forma original.

  6. La etapa $merge fusiona el documento en una colección determinada dinámicamente por el valor stream.coll de.

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

Volver

$sessionWindow

En esta página