Definição
A fase $merge especifica uma conexão no registro de conexão para gravar mensagens. A conexão deve ser uma conexão do Atlas.
Um estágio de pipeline do $merge tem a seguinte forma de protótipo:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
Sintaxe
A versão Atlas Stream Processing do $merge usa a maioria dos mesmos campos que a versão Atlas Data Federation. O Atlas Stream Processing também utiliza os seguintes campos que são exclusivos de sua implementação de $merge ou foram modificados para se adequar a ela. Para aprender mais sobre os campos compartilhados com o Atlas Data Federation $merge, consulte sintaxe $merge.
Campo | necessidade | Descrição |
|---|---|---|
| Obrigatório | Simplificado para reproduzir o suporte do Atlas Stream Processing para Para aprender mais, consulte esta descrição dos campos do Atlas Data Federation |
| Opcional | Amplia a funcionalidade em comparação com o estágio Quando configurado para Ao usar um valor de expressão dinâmica, ele deverá ser resolvido para uma das seguintes strings:
|
| Opcional | Amplia a funcionalidade em comparação com o estágio Ao usar um valor de expressão dinâmica, ele deverá ser resolvido para uma das seguintes strings:
|
| Condicional | Número de threads para distribuir operações de gravação. Deve ser um valor inteiro entre Se utilizar um valor de expressão dinâmica para |
Comportamento
Limitações
$merge deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $merge por pipeline.
O campo on tem requisitos especiais para $merge em relação a coleções fragmentadas. Para saber mais, consulte Sintaxe $merge.
Se utilizar um valor de expressão dinâmica para into.coll ou into.db, não poderá definir um valor de parallelism superior a 1.
$merge não é possível gravar em coleções de séries temporais. Para gravar documentos em coleções de séries temporais, utilize o estágio $emit.
Você precisa ter a função de administrador do Atlas para usar $merge em coleções fragmentadas.
Expressões Dinâmicas
Você pode usar uma expressão dinâmica como o valor dos seguintes campos:
into.dbinto.coll
Isso permite que seu processador de fluxo grave mensagens em diferentes collection de destino do Atlas, mensagem por mensagem.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um reconhecimento de data center e collection Atlas distintos, você pode escrever o seguinte estágio $merge :
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
Este estágio $merge :
Escreve a mensagem
Very Important Industriespara uma collection Atlas denominadaVIP.subscription.Escreve a mensagem
N. E. Buddypara uma collection Atlas denominadaemployee.requisition.Escreve a mensagem
Khan Traktorpara uma collection Atlas denominadacontractor.billableHours.
Você só pode usar expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um banco de dados ou uma coleção com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a fila de mensagens não entregues, se configurada, e processará as mensagens subsequentes. Se não houver uma fila de mensagens não entregues configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.
Salvando dados de tópicos do Kafka
Para salvar dados de streaming de vários Apache Kafka Topics em coleções em seu cluster Atlas , use o $merge estágio com o $source estágio. O estágio $source especifica os tópicos a partir dos quais ler os dados. O estágio $merge grava os dados na coleção de destino.
Use a seguinte sintaxe:
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
Exemplos
Exemplo básico
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$sourceestabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamadomy_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime.O estágio
$matchexclui documentos que têm umdewPoint.valuemenor ou igual a5.0e passa os documentos comdewPoint.valuemaior que5.0para o próximo estágio.O estágio
$mergegrava a saída na coleção do Atlas chamadastreamno banco de dadossample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.
Replicar eventos de fluxo de alteração
Você pode usar os parâmetros $merge.whenMatched e $merge.whenNotMatched para replicar os efeitos dos eventos de fluxo de alteração de acordo com o tipo de operação.
A seguinte agregação tem quatro fases:
O estágio
$sourceestabelece uma conexão com a coleçãodb1.coll1em um cluster do Atlas por meio da conexãoatlas1.O estágio
$addFieldsenriquece os documentos ingeridos com um campofullDocument._isDeletedefinido como o resultado de uma verificação de igualdade entre o valor"$operationTypede cada documento e"delete". Essa igualdade é considerada para um valor booleano.O estágio
$replaceRootsubstitui o documento pelo valor do campo$fullDocumentenriquecido.O estágio
$mergegrava emdb1.coll1através da conexãoatlas2, realizando duas verificações em cada documento:Primeiro, o campo
whenMatchedverifica se o documento corresponde a um documento existente na coleçãodb1.coll1usando_id, que é o campo de correspondência padrão, já queonnão está explicitamente definido. Se isso acontecer efullDocument._isDeleteestiver definido comotrue, então o Atlas exclui o documento correspondente. Se corresponder efullDocument._isDeleteestiver definido comofalse, o Atlas substituirá o documento correspondente pelo novo da fonte de dados de streaming.Em segundo lugar, se o Atlas Stream Processing não encontrar tal documento correspondente e
fullDocument._isDeletefor verdadeiro, o Atlas descartará o documento em vez de gravá-lo na coleção. Se não houver um documento correspondente efullDocument._isDeletefor falso, o Atlas insere o documento da fonte de dados de streaming na coleção.
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }