Menu Docs
Página inicial do Docs
/ /
/ / /

$merge (Processamento de Stream)

A fase $merge especifica uma conexão no registro de conexão para gravar mensagens. A conexão deve ser uma conexão do Atlas.

Um estágio de pipeline do $merge tem a seguinte forma de protótipo:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>",
"whenNotMatched": "insert | discard | expression",
"parallelism": <integer>
}
}

A versão Atlas Stream Processing do $merge usa a maioria dos mesmos campos que a versão Atlas Data Federation. O Atlas Stream Processing também utiliza os seguintes campos que são exclusivos de sua implementação de $merge ou foram modificados para se adequar a ela. Para aprender mais sobre os campos compartilhados com o Atlas Data Federation $merge, consulte sintaxe $merge.

Campo
necessidade
Descrição

into

Obrigatório

Simplificado para reproduzir o suporte do Atlas Stream Processing para $merge apenas em conexões do Atlas.

Para aprender mais, consulte esta descrição dos campos do Atlas Data Federation $merge.

whenMatched

Opcional

Amplia a funcionalidade em comparação com o estágio $merge do Atlas Data Federation, com suporte para "delete" e expressões dinâmicas.

Quando configurado para "delete", o Atlas exclui todas as mensagens que atendem à condição da coleção de destino.

Ao usar um valor de expressão dinâmica, ele deverá ser resolvido para uma das seguintes strings:

  • "merge"

  • "replace"

  • "keepExisting"

  • "delete"

whenNotMatched

Opcional

Amplia a funcionalidade em comparação com o estágio $merge do Atlas Data Federation, oferecendo suporte para expressões dinâmicas.

Ao usar um valor de expressão dinâmica, ele deverá ser resolvido para uma das seguintes strings:

  • "insert"

  • "discard"

  • "expression"

parallelism

Condicional

Número de threads para distribuir operações de gravação. Deve ser um valor inteiro entre 1 e 16. Valores mais elevados de paralelismo aumentam a taxa de transferência. No entanto, valores mais altos também exigem que o processador de fluxo e o cluster para o qual ele grava usem mais recursos computacionais.

Se utilizar um valor de expressão dinâmica para into.coll ou into.db, não poderá definir esse valor como maior que 1.

$merge deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $merge por pipeline.

O campo on tem requisitos especiais para $merge em relação a coleções fragmentadas. Para saber mais, consulte Sintaxe $merge.

Se utilizar um valor de expressão dinâmica para into.coll ou into.db, não poderá definir um valor de parallelism superior a 1.

$merge não é possível gravar em coleções de séries temporais. Para gravar documentos em coleções de séries temporais, utilize o estágio $emit.

Você precisa ter a função de administrador do Atlas para usar $merge em coleções fragmentadas.

Você pode usar uma expressão dinâmica como o valor dos seguintes campos:

  • into.db

  • into.coll

Isso permite que seu processador de fluxo grave mensagens em diferentes collection de destino do Atlas, mensagem por mensagem.

Exemplo

Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para classificar cada um deles em um reconhecimento de data center e collection Atlas distintos, você pode escrever o seguinte estágio $merge :

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

Este estágio $merge :

  • Escreve a mensagem Very Important Industries para uma collection Atlas denominada VIP.subscription.

  • Escreve a mensagem N. E. Buddy para uma collection Atlas denominada employee.requisition.

  • Escreve a mensagem Khan Traktor para uma collection Atlas denominada contractor.billableHours.

Você só pode usar expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.

Se você especificar um banco de dados ou uma coleção com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a fila de mensagens não entregues, se configurada, e processará as mensagens subsequentes. Se não houver uma fila de mensagens não entregues configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.

Para salvar dados de streaming de vários Apache Kafka Topics em coleções em seu cluster Atlas , use o $merge estágio com o $source estágio. O estágio $source especifica os tópicos a partir dos quais ler os dados. O estágio $merge grava os dados na coleção de destino.

Use a seguinte sintaxe:

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $match exclui documentos que têm um dewPoint.value menor ou igual a 5.0 e passa os documentos com dewPoint.value maior que 5.0 para o próximo estágio.

  3. O estágio $merge grava a saída na coleção do Atlas chamada stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Você pode usar os parâmetros $merge.whenMatched e $merge.whenNotMatched para replicar os efeitos dos eventos de fluxo de alteração de acordo com o tipo de operação.

A seguinte agregação tem quatro fases:

  1. O estágio $source estabelece uma conexão com a coleção db1.coll1 em um cluster do Atlas por meio da conexão atlas1.

  2. O estágio $addFields enriquece os documentos ingeridos com um campo fullDocument._isDelete definido como o resultado de uma verificação de igualdade entre o valor "$operationType de cada documento e "delete". Essa igualdade é considerada para um valor booleano.

  3. O estágio $replaceRoot substitui o documento pelo valor do campo $fullDocument enriquecido.

  4. O estágio $merge grava em db1.coll1 através da conexão atlas2, realizando duas verificações em cada documento:

    • Primeiro, o campo whenMatched verifica se o documento corresponde a um documento existente na coleção db1.coll1 usando _id, que é o campo de correspondência padrão, já que on não está explicitamente definido. Se isso acontecer e fullDocument._isDelete estiver definido como true, então o Atlas exclui o documento correspondente. Se corresponder e fullDocument._isDelete estiver definido como false, o Atlas substituirá o documento correspondente pelo novo da fonte de dados de streaming.

    • Em segundo lugar, se o Atlas Stream Processing não encontrar tal documento correspondente e fullDocument._isDelete for verdadeiro, o Atlas descartará o documento em vez de gravá-lo na coleção. Se não houver um documento correspondente e fullDocument._isDelete for falso, o Atlas insere o documento da fonte de dados de streaming na coleção.

{
$source: {
connectionName: “atlas1”,
db: “db1”,
coll: “coll1”,
fullDocument: “required”
}
},
{
$addFields: {
“fullDocument._isDelete”: {
$eq: [
“$operationType”,
“delete”
]
}
}
},
{
$replaceRoot: {
newRoot: “$fullDocument”
}
},
{
$merge: {
into: {
connectionName: “atlas2”,
db: “db1”,
coll: “coll1”
},
whenMatched: {
$cond: {
if: “$_isDelete”,
then: “delete”,
else: “replace”
}
},
whenNotMatched: {
$cond: {
if: “$_isDelete”,
then: “discard”,
else: “insert”
}
},
}
}

Voltar

$emit

Nesta página