Docs Menu
Docs Home
/ /
/ / /

$merge (Procesamiento de flujo)

El La etapa$merge especifica una conexión en el Registro de conexión donde se escribirán los mensajes. La conexión debe ser una conexión Atlas.

$merge debe ser la última etapa de cualquier canalización en la que aparezca. Solo puede utilizar una etapa $merge por canalización.

Una etapa de pipeline $merge tiene la siguiente forma de prototipo:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>",
"whenNotMatched": "insert | discard | expression",
"parallelism": <integer>
}
}

La versión de $merge para Atlas Stream Processing utiliza la mayoría de los mismos campos que la versión de Atlas Data Federation. Atlas Stream Processing también utiliza los siguientes campos, que son exclusivos de su implementación de $merge o se han modificado para adaptarlos. Para obtener más información sobre los campos compartidos con Atlas Data Federation,$merge consulte la sintaxis de $merge.

Campo
Necesidad
Descripción

into

Requerido

Simplificado para reflejar que el procesamiento de flujo Atlas admite $merge solo en conexiones Atlas.

Para obtener más información, consulte esta descripción de los $merge campos de Atlas Data Federation.

whenMatched

Opcional

Amplía la funcionalidad en comparación con la etapa Atlas Data Federation $merge con soporte para "delete" y expresiones dinámicas.

Cuando se establece en "delete", Atlas elimina todos los mensajes que coinciden con la condición de la colección de destino.

Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:

  • "merge"

  • "replace"

  • "keepExisting"

  • "delete"

  • "<pipeline>"

  • "<expression>"

whenNotMatched

Opcional

Extiende la funcionalidad en comparación con la etapa $merge de Atlas Data Federation con soporte para expresiones dinámicas.

Si usas un valor de expresión dinámica, este debe resolverse en una de las siguientes cadenas:

  • "insert"

  • "discard"

  • "expression"

parallelism

Condicional

Número de subprocesos a los que se distribuirán las operaciones de escritura. Debe ser un valor entero entre 1 y 16. Un valor de paralelismo más alto aumenta el rendimiento. Sin embargo, un valor más alto también requiere que el procesador de flujo y el clúster donde escribe utilicen más recursos computacionales.

Si utilizas un valor de expresión dinámica para into.coll o into.db, no puedes establecer este valor mayor que 1.

Cada procesador de flujo tiene un valor máximo de paralelismo acumulado, determinado por su nivel. El paralelismo acumulado de un procesador de flujo se calcula de la siguiente manera:

parallelism total - parallelized stages

Donde parallelism total es la suma de todos los parallelism valores mayores que 1 en las etapas,$source $lookup y $merge, y parallelized stages es el número de estas etapas con parallelism valores mayores 1 que.

Por ejemplo, si su etapa $source establece un valor parallelism de 4, su etapa $lookup no establece ningún valor parallelism (por lo tanto, el valor predeterminado es 1) y su etapa $merge establece un valor parallelism de 2, entonces tiene dos parallelized stages y el paralelismo acumulativo de su procesador de flujo se calcula como (4 + 2) - 2.

Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo.

El on campo tiene requisitos especiales para $merge en colecciones fragmentadas. Para obtener más información, consulte la sintaxis de $merge.

Si utiliza un valor de expresión dinámica para into.coll o into.db, no podrá establecer un valor parallelism mayor que 1.

$merge No se puede escribir en colecciones de series temporales. Para escribir documentos en colecciones de series temporales, use la etapa $emit.

Debe tener el rol de administrador de Atlas para usar $merge en colecciones fragmentadas.

Puede utilizar una expresión dinámica como valor de los siguientes campos:

  • into.db

  • into.coll

Esto permite que su procesador de flujo escriba mensajes en diferentes colecciones Atlas de destino mensaje por mensaje.

Ejemplo

Tiene un flujo de eventos de transacción que genera mensajes del siguiente formato:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para ordenar cada uno de ellos en una base de datos y colección Atlas distintas, puede escribir la siguiente etapa $merge:

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

Esta $merge etapa:

  • Escribe el mensaje Very Important Industries en una colección Atlas llamada VIP.subscription.

  • Escribe el mensaje N. E. Buddy en una colección Atlas llamada employee.requisition.

  • Escribe el mensaje Khan Traktor en una colección Atlas llamada contractor.billableHours.

Solo se pueden usar expresiones dinámicas que evalúen cadenas. Para obtener más información sobre expresiones dinámicas, consulte Operadores de expresión.

Si especifica una base de datos o colección con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la cola de mensajes no entregados, si está configurada, y procesa los mensajes subsiguientes. Si no hay una cola de mensajes no entregados configurada, Atlas Stream Processing omite el mensaje por completo y procesa los mensajes subsiguientes.

Para guardar datos de transmisión de varios temas de Apache KafkaPara integrar los datos en las colecciones de su clúster Atlas, utilice la $merge etapa con la etapa.$source La $source etapa especifica los temas desde los que se leerán los datos. La $merge etapa escribe los datos en la colección de destino.

Utilice la siguiente sintaxis:

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:

  1. La etapa establece una $source conexión con el agente de Apache Kafka que recopila estos informes en un tema my_weatherdata llamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndolo ingestionTime en.

  2. La etapa excluye los documentos que tienen $match un dewPoint.value menor o igual a 5.0 y pasa los documentos con dewPoint.value mayor que 5.0 a la siguiente etapa.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Nota

El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

Puede utilizar $merge.whenMatched $merge.whenNotMatched los parámetros y para replicar los efectos de los eventos de Change Stream según su tipo de operación.

La siguiente agregación tiene cuatro etapas:

  1. La etapa establece una conexión con $source la db1.coll1 colección en un clúster Atlas a través de la atlas1 conexión.

  2. La etapa enriquece los documentos ingeridos con $addFields un fullDocument._isDelete campo establecido con el valor de una comprobación de igualdad entre el "$operationType valor de cada documento "delete" y. Esta igualdad se evalúa como un valor booleano.

  3. La etapa reemplaza el documento con el valor $replaceRoot del $fullDocument campo enriquecido.

  4. La etapa $merge escribe en db1.coll1 a través de la conexión atlas2 y realiza dos comprobaciones en cada documento:

    • Primero, el campo whenMatched comprueba si el documento coincide con un documento existente en la colección db1.coll1 mediante _id, el campo de coincidencia predeterminado, ya que on no está configurado explícitamente. Si coincide y fullDocument._isDelete está configurado como true, Atlas elimina el documento coincidente. Si coincide y fullDocument._isDelete está configurado como false, Atlas reemplaza el documento coincidente con el nuevo documento de la fuente de datos de streaming.

    • En segundo lugar, si Atlas Stream Processing no encuentra ningún documento coincidente y fullDocument._isDelete es verdadero, Atlas descarta el documento en lugar de agregarlo a la colección. Si no existe dicho documento coincidente y fullDocument._isDelete es falso, Atlas inserta el documento de la fuente de datos de streaming en la colección.

{
$source: {
connectionName: “atlas1”,
db: “db1”,
coll: “coll1”,
fullDocument: “required”
}
},
{
$addFields: {
“fullDocument._isDelete”: {
$eq: [
“$operationType”,
“delete”
]
}
}
},
{
$replaceRoot: {
newRoot: “$fullDocument”
}
},
{
$merge: {
into: {
connectionName: “atlas2”,
db: “db1”,
coll: “coll1”
},
whenMatched: {
$cond: {
if: “$_isDelete”,
then: “delete”,
else: “replace”
}
},
whenNotMatched: {
$cond: {
if: “$_isDelete”,
then: “discard”,
else: “insert”
}
},
}
}

Volver

$emit

En esta página