Menu Docs
Página inicial do Docs
/ /
/ / /

$setStreamMeta Estágio de agregação (processamento de fluxo)

$setStreamMeta

O estágio $setStreamMeta define campos de metadados para documentos recebidos. Você pode usar esses metadados para executar operações seletivas em documentos com base em valores específicos sem alterar o conteúdo dos próprios documentos.

Um estágio $setStreamMeta tem o seguinte formato de protótipo:

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

O estágio $setStreamMeta recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

<metadata-field>

expressão

Obrigatório

campo de metadados a ser aplicado a documentos. Você deve definir a chave e o valor:

  • A chave deve ser uma string começando com stream..

  • O valor deve ser uma expressão.

Se a avaliação da expressão falhar, o Atlas Stream Processing enviará o documento para o DLQ.

Para saber mais, consulte os Exemplos.

$setStreamMeta deve vir após o estágio $source e antes do estágio $emit ou $merge.

Uma fonte de dados de streaming gera um documento para cada pedido feito com uma plataforma e-commerce . Os documentos assumem o seguinte formato:

{
orderId: 1,
productId: "A",
qty: 2
}

Um banco de dados do Atlas contém uma coleção products com documentos do seguinte formato:

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

A seguinte agregação tem seis estágios:

  1. O estágio ingere documentos de pedido da plataforma e-commerce , expondo cada registro à medida que é ingerido para os estágios de agregação $source subsequentes.

  2. O estágio $lookup une os documentos do pedido recebido a uma products collection no shop banco de dados no _id campo . O campo de array resultante é product chamado.

  3. O estágio transforma o valor da array de elemento único $unwind de product em um documento.

  4. O estágio define um campo de metadados $setStreamMeta chamado stream.coll igual ao valor product.category de.

  5. O estágio remove $unset o product campo, retornando o documento de origem à sua forma original.

  6. O estágio $merge mescla o documento em uma coleção determinada dinamicamente pelo valor stream.coll de.

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

Voltar

$sessionWindow

Nesta página