Definición
La etapa $merge especifica una conexión en el Registro de Conexiones para escribir mensajes. La conexión debe ser una conexión Atlas.
Ubicación
$merge debe ser la última etapa de cualquier pipeline en la que aparezca. Solo se puede utilizar una etapa $merge por pipeline.
Una etapa de pipeline $merge tiene la siguiente forma de prototipo:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
Sintaxis
La versión de $merge para Atlas Stream Processing utiliza la mayoría de los mismos campos que la versión de Atlas Data Federation. Atlas Stream Processing también utiliza los siguientes campos, que son exclusivos de su implementación de $merge o se han modificado para adaptarlos. Para obtener más información sobre los campos compartidos con Atlas Data Federation,$merge consulte la sintaxis de $merge.
Campo | Necesidad | Descripción |
|---|---|---|
| Requerido | Simplificado para reflejar que Atlas Stream Processing admite Para obtener más información, consulta esta descripción de los campos de Atlas Data Federation |
| Opcional | Amplía la funcionalidad en comparación con la etapa de Atlas Data Federation Cuando se establece en Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:
|
| Opcional | Extiende la funcionalidad en comparación con la etapa Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:
|
| Condicional | Número de hilos a los que distribuir las operaciones de guardado. Debe ser un valor entero entre Si utilizas un valor de expresión dinámica para Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:
Donde Por ejemplo, si tu etapa Si un procesador de flujo supera el paralelismo acumulativo máximo para su nivel, Atlas Stream Processing genera un error e informa del nivel mínimo de procesador requerido para el nivel de paralelismo previsto. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para aprender más, consulte Procesamiento de flujos. |
Comportamiento
Limitaciones
El campo on tiene requisitos especiales para $merge contra colecciones particionadas. Para obtener más información, consulte la sintaxis de $merge.
$merge no se puede guardar en colecciones de series de tiempo. Para guardar documentos en colecciones de series de tiempo, utiliza la etapa $emit.
Debes tener rol de administrador de Atlas para utilizar $merge en colecciones particionadas.
Expresiones dinámicas
Puedes utilizar una expresión dinámica como valor de los siguientes campos:
into.dbinto.coll
Esto permite que tu procesador de flujo escriba mensajes en diferentes colecciones de Atlas objetivo en función de cada mensaje.
Ejemplo
Tienes un flujo de eventos de transacciones que genera mensajes de la siguiente forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para clasificar cada uno de estos en una base de datos y colección distintas de Atlas, puedes escribir la siguiente etapa $merge:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
Esta $merge etapa:
Escribe el mensaje
Very Important Industriesen una colección Atlas llamadaVIP.subscription.Escribe el mensaje
N. E. Buddyen una colección Atlas llamadaemployee.requisition.Escribe el mensaje
Khan Traktoren una colección Atlas llamadacontractor.billableHours.
Solo se pueden usar expresiones dinámicas que se evalúen como cadenas de texto. Para obtener más información sobre expresiones dinámicas, consulta los operadores de expresiones.
Si especificas una base de datos o colección con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la fila de letra muerta si está configurada y procesa los mensajes siguientes. Si no se configura una fila de letra muerta, entonces Atlas Stream Processing omite el mensaje por completo y procesa los mensajes siguientes.
Guardando datos de temas de Kafka
Para guardar datos de transmisión de varios Temas de Apache Kafka en colecciones de tu clúster de Atlas, utiliza la etapa $merge junto con la etapa $source. La etapa $source especifica los temas de los que se debe leer datos. La etapa $merge escribe los datos en la colección objetivo.
Utiliza la siguiente sintaxis:
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
Ejemplos
Ejemplo básico
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominadomy_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo eningestionTime.La etapa
$matchexcluye los documentos que tienen undewPoint.valuemenor o igual a5.0y pasa los documentos condewPoint.valuemayor que5.0a la siguiente etapa.La fase
$mergeescribe la salida en una colección Atlas denominadastreamen la base de datossample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.
Replicar eventos del flujo de cambios en clústeres de Atlas
Puede utilizar los parámetros $merge.whenMatched y $merge.whenNotMatched para replicar los efectos de los eventos de Cambio de stream según su tipo de operación.
La siguiente agregación tiene cuatro etapas:
La etapa
$sourceestablece una conexión a la coleccióndb1.coll1en un clúster de Atlas a través de la conexiónatlas1.La etapa
$addFieldsenriquece los documentos ingresados con un campofullDocument._isDeleteestablecido en el valor de una verificación de igualdad entre el valor"$operationTypede cada documento y"delete". Esta igualdad evalúa a un booleano.La etapa
$replaceRootsustituye el documento con el valor del campo enriquecido$fullDocument.La etapa
$mergeescribe endb1.coll1a través de la conexiónatlas2, realizando dos comprobaciones en cada documento:Primero, el campo
whenMatchedverifica si el documento coincide con un documento existente en la coleccióndb1.coll1por_id, el campo de coincidencia predeterminado ya queonno está definido explícitamente. Si es así yfullDocument._isDeleteestá configurado entrue, Atlas elimina el documento correspondiente. Si coincide yfullDocument._isDeleteestá configurado enfalse, Atlas sustituye el documento coincidente con el nuevo de la fuente de datos en transmisión.En segundo lugar, si Atlas Stream Processing no encuentra ningún documento coincidente y
fullDocument._isDeletees verdadero, Atlas descarta el documento en lugar de escribirlo en la colección. Si no existe un documento que coincida de esta manera yfullDocument._isDeletees falso, Atlas inserta el documento de la fuente de datos de transmisión en la colección.
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }