Definición
La La etapa$merge especifica una conexión en el Registro de conexión donde se escribirán los mensajes. La conexión debe ser una conexión Atlas.
Ubicación
$merge debe ser la última etapa de cualquier canalización en la que aparezca. Solo puede utilizar una etapa $merge por canalización.
Una etapa de pipeline $merge tiene la siguiente forma de prototipo:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
Sintaxis
La versión de Stream Processing de Atlas de $merge utiliza la mayoría de los mismos campos que la versión de Atlas Data Federation. Atlas Stream Processing también utiliza los siguientes campos que son únicos de su implementación de $merge, o se modifican para adaptarse a él. Para saber más sobre los campos que se comparten con Atlas Data Federation $merge, consulta Sintaxis de $merge.
Campo | Necesidad | Descripción |
|---|---|---|
| Requerido | Simplificado para reflejar que el procesamiento de flujo Atlas admite Para obtener más información, consulta esta descripción de los campos de Atlas Data Federation |
| Opcional | Amplía la funcionalidad en comparación con la etapa de Atlas Data Federation Cuando se establece en Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:
|
| Opcional | Extiende la funcionalidad en comparación con la etapa Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:
|
| Condicional | Número de hilos a los que distribuir las operaciones de guardado. Debe ser un valor entero entre Si utilizas un valor de expresión dinámica para Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:
Donde Por ejemplo, si tu etapa Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo. |
Comportamiento
Limitaciones
El on campo tiene requisitos especiales para $merge en colecciones fragmentadas. Para obtener más información, consulte la sintaxis de $merge.
$merge no se puede guardar en colecciones de series de tiempo. Para guardar documentos en colecciones de series de tiempo, utiliza la etapa $emit.
Debes tener rol de administrador de Atlas para utilizar $merge en colecciones particionadas.
Expresiones dinámicas
Puedes utilizar una expresión dinámica como valor de los siguientes campos:
into.dbinto.coll
Esto permite que tu procesador de flujo escriba mensajes en diferentes colecciones de Atlas objetivo en función de cada mensaje.
Ejemplo
Tienes un flujo de eventos de transacciones que genera mensajes de la siguiente forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para ordenar cada uno de ellos en una base de datos y colección Atlas distintas, puede escribir la siguiente etapa $merge:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
Esta $merge etapa:
Escribe el mensaje
Very Important Industriesen una colección Atlas llamadaVIP.subscription.Escribe el mensaje
N. E. Buddyen una colección Atlas llamadaemployee.requisition.Escribe el mensaje
Khan Traktoren una colección Atlas llamadacontractor.billableHours.
Solo se pueden usar expresiones dinámicas que evalúen cadenas. Para obtener más información sobre expresiones dinámicas, consulte Operadores de expresión.
Si especificas una base de datos o colección con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la fila de letra muerta si está configurada y procesa los mensajes siguientes. Si no se configura una fila de letra muerta, entonces Atlas Stream Processing omite el mensaje por completo y procesa los mensajes siguientes.
Guardando datos de temas de Kafka
Para guardar datos de transmisión de múltiples Temas de Apache Kafka en colecciones de tu clúster de Atlas, utiliza la etapa $merge con la etapa $source. La etapa $source especifica los temas desde los que leer datos. La etapa $merge escribe los datos en la colección de destino.
Utiliza la siguiente sintaxis:
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
Ejemplos
Ejemplo básico
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominadomy_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo eningestionTime.La etapa
$matchexcluye los documentos que tienen undewPoint.valuemenor o igual a5.0y pasa los documentos condewPoint.valuemayor que5.0a la siguiente etapa.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.
Replicar eventos de flujo de cambios en los clústeres de Atlas
Puede utilizar los parámetros $merge.whenMatched y $merge.whenNotMatched para replicar los efectos de los eventos de Cambio de stream según su tipo de operación.
La siguiente agregación tiene cuatro etapas:
La etapa
$sourceestablece una conexión a la coleccióndb1.coll1en un clúster de Atlas a través de la conexiónatlas1.La etapa
$addFieldsenriquece los documentos ingresados con un campofullDocument._isDeleteestablecido en el valor de una verificación de igualdad entre el valor"$operationTypede cada documento y"delete". Esta igualdad evalúa a un booleano.La etapa
$replaceRootsustituye el documento con el valor del campo enriquecido$fullDocument.La etapa
$mergeescribe endb1.coll1a través de la conexiónatlas2, realizando dos comprobaciones en cada documento:Primero, el campo
whenMatchedverifica si el documento coincide con un documento existente en la coleccióndb1.coll1por_id, el campo de coincidencia predeterminado ya queonno está definido explícitamente. Si es así yfullDocument._isDeleteestá configurado entrue, Atlas elimina el documento correspondiente. Si coincide yfullDocument._isDeleteestá configurado enfalse, Atlas sustituye el documento coincidente con el nuevo de la fuente de datos en transmisión.En segundo lugar, si Atlas Stream Processing no encuentra ningún documento coincidente y
fullDocument._isDeletees verdadero, Atlas descarta el documento en lugar de escribirlo en la colección. Si no existe un documento que coincida de esta manera yfullDocument._isDeletees falso, Atlas inserta el documento de la fuente de datos de transmisión en la colección.
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }