Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$merge (Procesamiento de flujo)

La La etapa$merge especifica una conexión en el Registro de conexión donde se escribirán los mensajes. La conexión debe ser una conexión Atlas.

$merge debe ser la última etapa de cualquier canalización en la que aparezca. Solo puede utilizar una etapa $merge por canalización.

Una etapa de pipeline $merge tiene la siguiente forma de prototipo:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>",
"whenNotMatched": "insert | discard | expression",
"parallelism": <integer>
}
}

La versión de Stream Processing de Atlas de $merge utiliza la mayoría de los mismos campos que la versión de Atlas Data Federation. Atlas Stream Processing también utiliza los siguientes campos que son únicos de su implementación de $merge, o se modifican para adaptarse a él. Para saber más sobre los campos que se comparten con Atlas Data Federation $merge, consulta Sintaxis de $merge.

Campo
Necesidad
Descripción

into

Requerido

Simplificado para reflejar que el procesamiento de flujo Atlas admite $merge solo en conexiones Atlas.

Para obtener más información, consulta esta descripción de los campos de Atlas Data Federation $merge.

whenMatched

Opcional

Amplía la funcionalidad en comparación con la etapa de Atlas Data Federation $merge con soporte para "delete" y expresiones dinámicas.

Cuando se establece en "delete", Atlas elimina todos los mensajes que coinciden con la condición de la colección de destino.

Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:

  • "merge"

  • "replace"

  • "keepExisting"

  • "delete"

  • "<pipeline>"

  • "<expression>"

whenNotMatched

Opcional

Extiende la funcionalidad en comparación con la etapa $merge de Atlas Data Federation con soporte para expresiones dinámicas.

Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:

  • "insert"

  • "discard"

  • "expression"

parallelism

Condicional

Número de hilos a los que distribuir las operaciones de guardado. Debe ser un valor entero entre 1 y 16. Los valores más altos de paralelización aumentan el rendimiento. Sin embargo, valores superiores también requieren que el procesador de flujo y el clúster al que escribe utilicen más recursos computacionales.

Si utilizas un valor de expresión dinámica para into.coll o into.db, no puedes establecer este valor mayor que 1.

Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:

parallelism total - parallelized stages

Donde parallelism total es la suma de todos los valores parallelism mayores que 1 a través de las etapas $source, $lookup, y $merge, y parallelized stages es la cantidad de estas etapas con valores parallelism mayores que 1.

Por ejemplo, si tu etapa $source establece un valor parallelism de 4, tu etapa $lookup no define un valor parallelism (por lo tanto, se toma el valor por defecto 1), y tu etapa $merge define un valor parallelism de 2, entonces tiene dos parallelized stages y el paralelismo acumulativo de su procesador de flujo se calcula como (4 + 2) - 2.

Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo.

El on campo tiene requisitos especiales para $merge en colecciones fragmentadas. Para obtener más información, consulte la sintaxis de $merge.

$merge no se puede guardar en colecciones de series de tiempo. Para guardar documentos en colecciones de series de tiempo, utiliza la etapa $emit.

Debes tener rol de administrador de Atlas para utilizar $merge en colecciones particionadas.

Puedes utilizar una expresión dinámica como valor de los siguientes campos:

  • into.db

  • into.coll

Esto permite que tu procesador de flujo escriba mensajes en diferentes colecciones de Atlas objetivo en función de cada mensaje.

Ejemplo

Tienes un flujo de eventos de transacciones que genera mensajes de la siguiente forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para ordenar cada uno de ellos en una base de datos y colección Atlas distintas, puede escribir la siguiente etapa $merge:

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

Esta $merge etapa:

  • Escribe el mensaje Very Important Industries en una colección Atlas llamada VIP.subscription.

  • Escribe el mensaje N. E. Buddy en una colección Atlas llamada employee.requisition.

  • Escribe el mensaje Khan Traktor en una colección Atlas llamada contractor.billableHours.

Solo se pueden usar expresiones dinámicas que evalúen cadenas. Para obtener más información sobre expresiones dinámicas, consulte Operadores de expresión.

Si especificas una base de datos o colección con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la fila de letra muerta si está configurada y procesa los mensajes siguientes. Si no se configura una fila de letra muerta, entonces Atlas Stream Processing omite el mensaje por completo y procesa los mensajes siguientes.

Para guardar datos de transmisión de múltiples Temas de Apache Kafka en colecciones de tu clúster de Atlas, utiliza la etapa $merge con la etapa $source. La etapa $source especifica los temas desde los que leer datos. La etapa $merge escribe los datos en la colección de destino.

Utiliza la siguiente sintaxis:

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominado my_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo en ingestionTime.

  2. La etapa $match excluye los documentos que tienen un dewPoint.value menor o igual a 5.0 y pasa los documentos con dewPoint.value mayor que 5.0 a la siguiente etapa.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.

Puede utilizar los parámetros $merge.whenMatched y $merge.whenNotMatched para replicar los efectos de los eventos de Cambio de stream según su tipo de operación.

La siguiente agregación tiene cuatro etapas:

  1. La etapa $source establece una conexión a la colección db1.coll1 en un clúster de Atlas a través de la conexión atlas1.

  2. La etapa $addFields enriquece los documentos ingresados con un campo fullDocument._isDelete establecido en el valor de una verificación de igualdad entre el valor "$operationType de cada documento y "delete". Esta igualdad evalúa a un booleano.

  3. La etapa $replaceRoot sustituye el documento con el valor del campo enriquecido $fullDocument.

  4. La etapa $merge escribe en db1.coll1 a través de la conexión atlas2, realizando dos comprobaciones en cada documento:

    • Primero, el campo whenMatched verifica si el documento coincide con un documento existente en la colección db1.coll1 por _id, el campo de coincidencia predeterminado ya que on no está definido explícitamente. Si es así y fullDocument._isDelete está configurado en true, Atlas elimina el documento correspondiente. Si coincide y fullDocument._isDelete está configurado en false, Atlas sustituye el documento coincidente con el nuevo de la fuente de datos en transmisión.

    • En segundo lugar, si Atlas Stream Processing no encuentra ningún documento coincidente y fullDocument._isDelete es verdadero, Atlas descarta el documento en lugar de escribirlo en la colección. Si no existe un documento que coincida de esta manera y fullDocument._isDelete es falso, Atlas inserta el documento de la fuente de datos de transmisión en la colección.

{
$source: {
connectionName: “atlas1”,
db: “db1”,
coll: “coll1”,
fullDocument: “required”
}
},
{
$addFields: {
“fullDocument._isDelete”: {
$eq: [
“$operationType”,
“delete”
]
}
}
},
{
$replaceRoot: {
newRoot: “$fullDocument”
}
},
{
$merge: {
into: {
connectionName: “atlas2”,
db: “db1”,
coll: “coll1”
},
whenMatched: {
$cond: {
if: “$_isDelete”,
then: “delete”,
else: “replace”
}
},
whenNotMatched: {
$cond: {
if: “$_isDelete”,
then: “discard”,
else: “insert”
}
},
}
}

Volver

$emit

En esta página