Docs Menu
Docs Home
/ /

$merge (etapa de agregación)

Nota

Esta página describe el $merge etapa, que genera los resultados de la canalización de agregación en una colección. Para el $mergeObjects Operador, que fusiona documentos en un solo documento,$mergeObjects consulte.

$merge

Escribe los resultados de la canalización de agregación en una colección especificada. El operador$merge debe ser la última etapa de pipeline.

La etapa de $merge:

  • Se puede enviar a una colección en la misma base de datos o en una base de datos diferente.

  • Puede generar salida a la misma colección que se está agregando. Para obtener más información, consulta Salida a la misma colección que se está agregando.

  • Considera los siguientes puntos al usar las etapas $merge o $out en un pipeline de agregación:

    • A partir de MongoDB 5.0, los pipelines con una etapa $merge pueden ejecutarse en nodos secundarios del set de réplicas si todos los nodos del clúster tienen la featureCompatibilityVersion establecida en 5.0 o superior y la preferencia de lectura permite lecturas secundarias.

      • $merge y $out se ejecutan en nodos secundarios, pero las operaciones de escritura se envían al nodo primario.

      • No todas las versiones de los drivers son compatibles con las operaciones $merge enviadas a los nodos secundarios. Para obtener más detalles, consulte la documentación del driver.

    • En versiones anteriores de MongoDB, los pipelines con etapas de $out o $merge siempre se ejecutan en el nodo primario y no se considera la preferencia de lectura.

  • Cree una nueva colección si la colección de salida no existe ya.

  • Puede incorporar resultados (insertar nuevos documentos, fusionar documentos, reemplazar documentos, mantener documentos existentes, fallar la operación, procesar documentos con un pipeline de actualización personalizado) en una colección existente.

  • Puede dar salida a una colección fragmentada. La colección de entrada también se puede fragmentar.

Para una comparación con la etapa $out que también genera los resultados de la agregación en una colección, consulta Comparación de $merge y $out.

Nota

Vistas materializadas on-demand

$merge puede incorporar los resultados del pipeline en una colección de salida existente en lugar de realizar un reemplazo completo de la colección. Esta funcionalidad permite a los usuarios crear vistas materializadas on-demand, donde el contenido de la colección de salida se actualiza de forma incremental cuando se ejecuta el pipeline.

Para obtener más información sobre este caso de uso, consulta Vistas materializadas on-demand así como los ejemplos de esta página.

Las vistas materializadas son distintas de las vistas de solo lectura. Para obtener información sobre la creación de vistas de solo lectura, consulta vistas de solo lectura.

Puedes usar $merge para implementaciones alojadas en los siguientes entornos:

  • MongoDB Atlas: El servicio totalmente gestionado para implementaciones de MongoDB en la nube

  • MongoDB Enterprise: La versión basada en suscripción y autogestionada de MongoDB

  • MongoDB Community: La versión de MongoDB con código fuente disponible, de uso gratuito y autogestionada.

$merge tiene la siguiente sintaxis:

{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

Por ejemplo:

{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

Si utilizas todas las opciones por defecto de $merge, incluida la escritura en una colección de la misma base de datos, puedes utilizar la forma simplificada:

{ $merge: <collection> } // Output collection is in the same database

El $merge procesa un documento con los siguientes campos:

Campo
Descripción

La colección de resultados. Especifica una de las siguientes opciones:

  • El nombre de la colección como un string para enviar a una colección en la misma base de datos donde se ejecuta la agregación. Por ejemplo:

    into: "myOutput"

  • El nombre de la base de datos y de la colección en un documento para enviar a una colección en la base de datos especificada. Por ejemplo:

    into: { db:"myDB", coll:"myOutput" }

Si la colección de salida no existe, $merge crea la colección:

La colección de salida puede ser una colección fragmentada.

Opcional. Campo o campos que funcionan como identificador único de un documento. El identificador determina si un documento de resultados coincide con un documento existente en la colección de salida. Especifique uno de los siguientes:

  • Un solo nombre de campo como un string. Por ejemplo:

    on: "_id"

  • Una combinación de campos en un arreglo. Por ejemplo:

    on: [ "date", "customerId" ]
    The order of the fields in the array does not matter, and you cannot specify the same field multiple times.

Para el campo o los campos especificados:

  • Los documentos de resultados de la agregación deben contener el(los) campo(s) especificado(s) en el on, a menos que el campo on sea el campo _id. Si falta el campo _id en un documento de resultados, MongoDB lo agrega automáticamente.

  • El campo o los campos especificados no pueden contener un valor nulo o de matriz.

$merge requiere un índice único con claves que correspondan a los campos del identificador. Aunque el orden de la especificación de la clave del índice no importa, el índice único solo debe contener los on campos como claves.

  • El índice también debe tener la misma intercalación que la de la agregación.

  • El índice único puede ser un índice disperso.

  • El índice único no puede ser un índice parcial.

  • Para las colecciones de salida que ya existen, el índice correspondiente ya debe existir.

El valor por defecto de on depende de la colección de salida:

  • Si la colección de salida no existe, el identificador on debe ser y por defecto es el campo _id. El índice único correspondiente de _id se crea automáticamente.

  • Si la colección de salida existente no está particionada, el identificador on se asigna por defecto al campo _id.

  • Si la colección de salida existente es una colección fragmentada, el identificador on se asigna por defecto a todos los campos de la clave de fragmentación y al campo _id. Si se especifica un identificador de on diferente, el on debe contener todos los campos de la clave de fragmentación.

Opcional. El comportamiento de $merge si un documento resultante y un documento existente en la colección tienen el mismo valor para el o los campos especificados.

Se puede especificar cualquiera de los dos:

  • Una de las cadenas de acción predefinidas:

    Acción
    Descripción

    Reemplaza el documento existente en la colección de salida con el documento de resultados coincidente.

    Al realizar una sustitución, el documento de reemplazo no puede dar lugar a una modificación del valor _id o, si la colección de salida está particionada, del valor de la clave de partición. De lo contrario, la operación genera un error.

    Para evitar este error, si el campo on no incluye el campo _id, remueve el campo _id en los resultados de la agregación para evitar el error, como con una etapa $unset anterior, y así sucesivamente.

    Mantén el documento existente en la colección de salida.

    "merge" (por defecto)

    Fusionar los documentos coincidentes (similar al operador $mergeObjects).

    • Si el documento de resultados contiene campos que no están en el documento existente, se deben agregar estos nuevos campos al documento existente.

    • Si el documento de resultados contiene campos en el documento existente, se deben reemplazar los valores de los campos existentes con los del documento de resultados.

    Por ejemplo, si la colección de salida contiene el documento:

    { _id: 1, a: 1, b: 1 }

    Y los resultados de la agregación contienen el documento:

    { _id: 1, b: 5, z: 1 }

    Entonces, el documento combinado es:

    { _id: 1, a: 1, b: 5, z: 1 }

    Al realizar una fusión, el documento fusionado no puede resultar en una modificación del valor de _id ni, si la colección de salida está particionada, del valor de la clave de partición. De lo contrario, la operación genera un error.

    Para evitar este error, si el campo on no incluye el campo _id, remueve el campo _id en los resultados de la agregación para evitar el error, como con una etapa $unset anterior, y así sucesivamente.

    Se debe detener y cancelar la operación de agregación. Cualquier cambio en la colección de salida de documentos anteriores no se revierte.

  • Un pipeline de agregación para actualizar el documento en la colección.

    [ <stage1>, <stage2> ... ]

    El pipeline solo puede consistir en las siguientes etapas:

    El pipeline no puede modificar el valor del campo on. Por ejemplo, si estás haciendo coincidir en el campo month, el pipeline no puede modificar el campo month.

    El pipeline whenMatched puede acceder directamente a los campos de los documentos existentes en la colección de salida usando $<field>.

    Para acceder a los campos de los documentos de resultados de agregación, usa cualquiera de las siguientes opciones:

    • La variable $$new de funcionalidad incorporada para acceder al campo. Específicamente, $$new.<field>. La variable $$new solo está disponible si se omite la especificación let.

    • Las variables definidas por el usuario en el campo let.

      Se debe especificar el prefijo del signo de dólar doble ($$) junto con el nombre de la variable en el formato $$<variable_name>. Por ejemplo, $$year. Si la variable está configurada como un documento, también se puede incluir un campo de documento en el formulario $$<variable_name>.<field>. Por ejemplo, $$year.month.

      Para más ejemplos, consulta Usar variables para personalizar la combinación.

Opcional. Especifica variables para uso en el pipeline whenMatched.

Especifica un documento con los nombres de las variables y las expresiones de valores:

{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }

Si no se especifica, es por defecto { new: "$$ROOT" } (consulta ROOT). El pipeline whenMatched puede acceder a la variable $$new.

Para acceder a las variables en el pipeline whenMatched:

Se debe especificar el prefijo del signo de dólar doble ($$) junto con el nombre de la variable en el formato $$<variable_name>. Por ejemplo, $$year. Si la variable está configurada como un documento, también se puede incluir un campo de documento en el formulario $$<variable_name>.<field>. Por ejemplo, $$year.month.

Para ejemplos, consulta Usar Variables para personalizar la combinación.

Opcional. El comportamiento de $merge si un documento de resultado no coincide con un documento existente en la colección de salida.

Puedes especificar una de las cadenas de acción predefinidas:

Acción
Descripción

"insert" (Default)

Inserta el documento en la colección de salida.

Desecha el documento. Específicamente, $merge no inserta el documento en la colección de salida.

Se debe detener y cancelar la operación de agregación. Cualquier cambio ya escrito en la colección de salida no se revierte.

Si el campo _id no está presente en un documento de los resultados de la canalización de agregación, la etapa $merge lo genera automáticamente.

Por ejemplo, en la siguiente canalización de agregación, $project excluye el campo _id de los documentos pasados a $merge. Cuando $merge guarda estos documentos en el "newCollection", $merge genera un nuevo campo _id y un valor.

db.sales.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )

La operación $merge crea una nueva colección si la colección de salida especificada no existe.

  • La colección de salida se crea cuando $merge guarda el primer documento a la colección y es visible inmediatamente.

  • Si la agregación falla, cualquier guardado completado por el $merge antes del error no se revertirá.

Nota

Para un set de réplicas o un autónomo, si la base de datos de salida no existe, $merge también crea la base de datos.

Para un clúster particionado, la base de datos de salida especificada ya debe existir.

Si la colección de salida no existe, $merge requiere que el identificador on sea el campo _id. Para utilizar un valor de campo on diferente para una colección que no existe, primero puedes crear la colección creando un índice único en el(los) campo(s) deseado(s). Por ejemplo, si la colección de salida newDailySales201905 no existe y desea especificar el campo salesDate como el identificador de encendido:

db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
db.sales.aggregate( [
{ $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
{ $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
{ $merge : { into : "newDailySales201905", on: "salesDate" } }
] )

La etapa $merge puede generar una salida a una colección fragmentada. Cuando la colección de salida está fragmentada, $merge utiliza el campo _id y todos los campos de clave de partición como el identificador en por defecto. Si se anula el valor por defecto, el identificador en debe incluir todos los campos de la clave de fragmentación:

{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

Por ejemplo, se debe usar el método sh.shardCollection() para crear una nueva colección particionada newrestaurants con el campo postcode como clave de partición.

sh.shardCollection(
"exampledb.newrestaurants", // Namespace of the collection to shard
{ postcode: 1 }, // Shard key
);

La colección newrestaurants contendrá documentos con información sobre nuevas aperturas de restaurantes por mes (campo date) y código postal (clave de fragmentación); en concreto, el identificador en es ["date", "postcode"] (el orden de los campos no importa). Debido a que $merge requiere un índice único con claves que correspondan a los campos de identificador en, crea el índice único (el orden de los campos no importa): [1]

use exampledb
db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )

Con la colección particionada restaurants y el índice único creado, se puede utilizar $merge para enviar los resultados de la agregación a esta colección, coincidiendo en [ "date", "postcode" ] como en este ejemplo:

use exampledb
db.openings.aggregate([
{ $group: {
_id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
restaurants: { $push: "$restaurantName" } } },
{ $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
{ $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
])
[1] El método sh.shardCollection() también puede crear un índice único en la clave de fragmentación cuando se pasa la opción { unique: true } si: la clave de fragmentación es basada en rango, la colección está vacía y no existe un índice único en la clave de fragmentación. En el ejemplo anterior, dado que el identificador on es la clave de fragmentación y otro campo, se requiere una operación separada para crear el índice correspondiente.

$merge puede reemplazar un documento existente en la colección de salida si los resultados de la agregación contienen un documento o documentos que coincidan según la especificación on. Por lo tanto, $merge puede reemplazar todos los documentos en la colección existente si los resultados de la agregación incluyen documentos coincidentes para todos los documentos existentes en la colección y especificar "reemplazar" para whenMatched.

Sin embargo, para reemplazar una colección existente independientemente de los resultados de la agregación, se debe usar $out en su lugar.

Los $merge errores ocurren si el $merge resulta en un cambio en el valor _id de un documento existente.

Tip

Para evitar este error, si el campo on no incluye el campo _id, remueve el campo _id en los resultados de la agregación para evitar el error, como con una etapa $unset precedente, y así sucesivamente.

Además, para una colección fragmentada, $merge también genera un error si provoca un cambio en el valor de clave de fragmentación de un documento existente.

Cualquier guardado completado por el $merge antes del error no se revertirá.

Si el índice único utilizado por $merge para los campo(s) se descartan a mitad de la agregación, no hay garantía de que la agregación se detenga. Si la agregación continúa, no hay garantía de que los documentos no tengan valores duplicados en el campo on.

Si el $merge intenta guardar un documento que viola algún índice único en la colección de salida, la operación genera un error. Por ejemplo:

Si su colección utiliza validación de esquemas y tiene validationAction configurado en error, insertar un documento no válido o actualizar un documento con valores no válidos con $merge genera un MongoServerError y el documento no se escribe en la colección de destino. Si hay varios documentos no válidos, solo el primer documento no válido que se encuentre generará un error. Todos los documentos válidos se guardan en la colección de destino, y todos los documentos no válidos fallan al intentar guardarse.

Si todas las siguientes condiciones son verdaderas para una etapa $merge, $merge inserta el documento directamente en la colección de salida:

  • El valor de whenMatched es una canalización de agregación,

  • El valor de whenNotMatched es insert, y

  • No hay coincidencia para un documento en la colección de salida,

$merge inserta el documento directamente en la colección de salida.

Con la introducción de $merge, MongoDB proporciona dos etapas, $merge y $out, para escribir los resultados de la canalización de agregación en una colección:

  • Se puede enviar a una colección en la misma base de datos o en una base de datos diferente.

  • Se puede enviar a una colección en la misma base de datos o en una base de datos diferente.

  • Cree una nueva colección si la colección de salida no existe ya.

  • Cree una nueva colección si la colección de salida no existe ya.

  • Puede incorporar resultados (insertar nuevos documentos, fusionar documentos, reemplazar documentos, mantener documentos existentes, fallar la operación, procesar documentos con un pipeline de actualización personalizado) en una colección existente.

  • Se debe reemplazar por completo la colección de salida si ya existe.

  • Puede dar salida a una colección fragmentada. La colección de entrada también se puede fragmentar.

  • No se puede enviar a una colección particionada. Sin embargo, la colección de entrada se puede particionar.

  • Corresponde a las instrucciones SQL:

    • MERGE.

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

    • Crear/actualizar vistas materializadas.

  • Corresponde a la instrucción SQL:

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

$merge puede generar salida a la misma colección que se está agregando. También puedes exportar a una colección que aparezca en otras etapas del pipeline, como $lookup.

Restricciones
Descripción

Un pipeline de agregación no puede utilizar $merge dentro de una transacción.

Un pipeline de agregación no puede usar $merge para generar una colección de series de tiempo.

Separate from materialized view

Una definición de vista no puede incluir la etapa $merge. Si la definición de la vista incluye una pipeline anidada (por ejemplo, la definición de la vista incluye la etapa $facet), esta restricción de la etapa $merge también se aplica a las pipelines anidadas.

$lookup etapa

$lookup El pipeline anidado de la etapa no puede incluir la etapa $merge.

$facet etapa

$facet El pipeline anidado de la etapa no puede incluir la etapa $merge.

$unionWith El pipeline anidado de la etapa no puede incluir la etapa $merge.

"linearizable" readConcern

La etapa $merge no se puede usar junto con el nivel de consistencia de lectura "linearizable". Es decir, si especificas el "linearizable" nivel de consistencia de lectura para db.collection.aggregate(), no puedes incluir la etapa $merge en la pipeline.

Si la colección de salida no existe, el $merge crea la colección.

Por ejemplo, una colección llamada salaries en la base de datos zoo se llena con el salario de los empleados y el historial de los departamentos:

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
])

Puedes utilizar las etapas $group y $merge para crear inicialmente una colección llamada budgets (en la base de datos reporting) a partir de los datos que se encuentran actualmente en la colección salaries:

Nota

Para un set de réplicas o una implementación autónoma, si la base de datos de salida no existe, $merge también crea la base de datos.

Para una implementación de clúster particionado, la base de datos de salida especificada ya debe existir.

db.getSiblingDB("zoo").salaries.aggregate( [
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
  • Etapa $group para agrupar los salarios por el fiscal_year y el dept.

  • La etapa $merge guarda la salida de la etapa anterior $group en la colección budgets de la base de datos reporting.

Para ver los documentos en la nueva colección budgets:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

La colección budgets contiene los siguientes documentos:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

El siguiente ejemplo usa las colecciones del ejemplo anterior.

La colección de ejemplo salaries contiene el salario de los empleados y el historial de los departamentos:

db.salaries.insertMany( [
{ _id : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ _id : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ _id : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ _id : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ _id : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ _id : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ _id : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ _id : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ _id : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ _id : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
] )

La colección de ejemplo budgets contiene los presupuestos anuales acumulados:

db.budgets.insertMany( [
{ _id : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 },
{ _id : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 },
{ _id : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 },
{ _id : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 },
{ _id : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 },
{ _id : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
] )

Durante el ejercicio fiscal actual (2019 en este ejemplo), se agregan nuevos empleados a la colección salaries y se preasignan nuevos recuentos de personal para el año siguiente:

db.getSiblingDB("zoo").salaries.insertMany( [
{ _id : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
{ _id : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
{ _id : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
{ _id : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
] )

Para actualizar la colección budgets para que refleje la nueva información salarial, el siguiente pipeline de agregación usa:

  • Etapa $match para encontrar todos los documentos con fiscal_year mayor o igual que 2019.

  • Etapa $group para agrupar los salarios por el fiscal_year y el dept.

  • $merge para guardar el conjunto de resultados en la colección budgets, reemplazando documentos con el mismo valor _id (en este ejemplo, un documento con el año fiscal y el departamento). Para los documentos que no tienen coincidencias en la colección, $merge inserta los nuevos documentos.

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match : { fiscal_year: { $gte : 2019 } } },
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

Después de ejecutar la agregación, visualice los documentos en la colección budgets:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

La colección budgets incorpora los nuevos datos salariales para el año fiscal 2019 y agrega nuevos documentos para el año fiscal 2020:

{ _id : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ _id : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ _id : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ _id : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ _id : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ _id : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ _id : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }

Para asegurar que el $merge no sobrescriba datos existentes en la colección, configura whenMatched a keepExisting o fail.

La colección de ejemplo salaries en la base de datos zoo contiene el salario de los empleados y el historial de departamentos:

db.salaries.insertMany( [
{ _id : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ _id : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ _id : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ _id : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ _id : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ _id : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ _id : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ _id : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ _id : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ _id : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
] )

Una colección orgArchive en la base de datos reporting contiene registros históricos de la organización departamental de los años fiscales pasados. Los registros archivados no deben modificarse.

db.orgArchive.insertMany( [
{ _id : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 },
{ _id : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 },
{ _id : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 },
{ _id : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
] )

La colección orgArchive tiene un índice compuesto único en los campos fiscal_year y dept. Específicamente, debe haber como máximo un registro para la misma combinación de año fiscal y departamento:

db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )

Al final del año fiscal en curso (2019 en este ejemplo), la colección salaries contiene los siguientes documentos:

db.salaries.insertMany( [
{ _id : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 },
{ _id : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 },
{ _id : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 },
{ _id : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 },
{ _id : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 },
{ _id : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 },
{ _id : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 },
{ _id : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 },
{ _id : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 },
{ _id : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 },
{ _id : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 },
{ _id : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 },
{ _id : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 },
{ _id : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
] )

Para actualizar la colección orgArchive para incluir el año fiscal 2019 que acaba de finalizar, se utiliza la siguiente canalización de agregación:

  • Etapa $match para encontrar todos los documentos con fiscal_year igual a 2019.

  • Etapa $group para agrupar a los empleados por fiscal_year y dept.

  • La etapa $project para suprimir el campo _id y añadir campos separados para dept y fiscal_year. Cuando los documentos se pasan a $merge, $merge genera automáticamente un nuevo campo _id para los documentos.

  • $merge para guardar el conjunto de resultados en orgArchive.

    La etapa $merge coincide los documentos en los campos dept y fiscal_year y fails cuando coinciden. Es decir, si ya existe un documento para el mismo departamento y año fiscal, se producen los errores de $merge.

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )

Después de la operación, la colección orgArchive contiene los siguientes documentos:

{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }

Si la colección orgArchive ya contenía un documento para 2019 del departamento "A" y/o "B", la agregación falla debido al error de clave duplicada. Sin embargo, cualquier documento insertado antes del error no será revertido.

Si especificas keepExisting para el documento coincidente, la agregación no afecta al documento coincidente y no genera un error de clave duplicada. De manera similar, si especificas reemplazar, la operación no fallaría; sin embargo, la operación reemplazaría el documento existente.

Por defecto, si un documento en los resultados de la agregación coincide con un documento en la colección, la etapa $merge fusiona los documentos.

Una colección de ejemplo purchaseorders se completa con la información de los pedidos de compra por trimestre y regiones:

db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )

Otra colección de ejemplo reportedsales se completa con la información de ventas informada por trimestre y regiones:

db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )

Supongamos que, para fines de elaboración de reportes, se desea ver los datos por trimestre en el siguiente formato:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

Puedes usar $merge para fusionar resultados de la colección purchaseorders y la colección reportedsales para crear una nueva colección quarterlyreport.

Para crear la colección quarterlyreport, puede utilizar el siguiente pipeline:

db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
Primera etapa:

La etapa $group agrupa por trimestre y usa $sum para agregar los campos qty en un nuevo campo purchased. Por ejemplo:

Para crear la colección quarterlyreport, puede utilizar este pipeline:

{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
Segunda fase:
La etapa $merge escribe los documentos en la colección quarterlyreport en la misma base de datos. Si la etapa encuentra un documento existente en la colección que coincide en el campo _id, la etapa fusiona los documentos coincidentes. De lo contrario, la etapa inserta el documento. Para la creación inicial, ningún documento debería coincidir.

Para ver los documentos en la colección, ejecute la siguiente operación:

db.quarterlyreport.find().sort( { _id: 1 } )

La colección contiene los siguientes documentos:

{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }

De manera similar, se debe ejecutar el siguiente pipeline de agregación en la colección reportedsales para combinar los resultados de ventas en la colección quarterlyreport.

db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
Primera etapa:

La etapa $group agrupa por trimestre y usa $sum para agregar los campos qty en un nuevo campo sales. Por ejemplo:

{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
Segunda fase:
La etapa $merge guarda los documentos en la colección quarterlyreport en la misma base de datos. Si la etapa encuentra un documento existente en la colección que coincide con el campo _id (el trimestre), la etapa fusiona los documentos coincidentes. De lo contrario, la fase inserta el documento.

Para ver los documentos en la colección quarterlyreport después de que los datos se hayan fusionado, ejecute la siguiente operación:

db.quarterlyreport.find().sort( { _id: 1 } )

La colección contiene los siguientes documentos:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

El $merge puede usar un pipeline de actualización personalizada cuando los documentos coinciden. El pipeline whenMatched puede tener las siguientes etapas:

Una colección de ejemplo votes se completa con el recuento diario de votos. Se debe crear la colección con los siguientes documentos:

db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )

Otra colección de ejemplo monthlytotals tiene los totales de votos mensuales más recientes. Se debe crear la colección con el siguiente documento:

db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)

Al final de cada día, los votos de ese día se insertan en la colección votes:

db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)

Puedes usar $merge con una pipeline personalizada para actualizar el documento existente en la colección monthlytotals:

db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
Primera etapa:

La etapa $match encuentra los votos del día específico. Por ejemplo:

{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
Segunda fase:

La etapa $project establece el campo _id en una string de año-mes. Por ejemplo:

{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
Tercera etapa:

La etapa $merge guarda los documentos en la colección monthlytotals en la misma base de datos. Si la etapa encuentra un documento existente en la colección que coincide con el campo _id, la etapa utiliza un pipeline para añadir los votos de thumbsup y los votos de thumbsdown.

  • Este pipeline no puede acceder directamente a los campos del documento de resultados. Para acceder al campo thumbsup y al campo thumbsdown en el documento de resultados, el pipeline utiliza la variable $$new; es decir, $$new.thumbsup y $new.thumbsdown.

  • Este pipeline puede acceder directamente al campo thumbsup y al campo thumbsdown en el documento existente en la colección; es decir, $thumbsup y $thumbsdown.

El documento resultante sustituye al documento existente.

Para ver los documentos en la colección monthlytotals después de la operación de fusión, ejecute la siguiente operación:

db.monthlytotals.find()

La colección contiene el siguiente documento:

{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }

Puedes utilizar variables en la etapa $merge en el campo whenMatched. Es necesario definir las variables antes de poder utilizarlas.

Defina variables en uno o en ambos de los siguientes:

Para usar variables en whenMatched:

Se debe especificar el prefijo del signo de dólar doble ($$) junto con el nombre de la variable en el formato $$<variable_name>. Por ejemplo, $$year. Si la variable está configurada como un documento, también se puede incluir un campo de documento en el formulario $$<variable_name>.<field>. Por ejemplo, $$year.month.

Las pestañas a continuación demuestran el comportamiento cuando las variables se definen en la etapa de fusión, el comando de agregación, o en ambos.

Puede definir variables en la etapa $merge let y usar las variables en el campo whenMatched.

Ejemplo:

db.cakeSales.insertOne( [
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
] )
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {}
} )
db.cakeSales.find()

El ejemplo:

  • crea una colección llamada cakeSales

  • ejecuta un comando aggregate que define una variable year en el $merge let y añade el año a cakeSales usando whenMatched

  • retrieves the cakeSales document

Salida:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }

Nuevo en la versión 5.0.

Puedes definir variables en el comando aggregate let y usar las variables en la etapa $merge campo whenMatched.

Ejemplo:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
whenMatched: [ {
$addFields: { "salesYear": "$$year" } }
] }
}
],
cursor: {},
let : { year: "2020" }
} )
db.cakeSales.find()

El ejemplo:

  • crea una colección llamada cakeSales

  • ejecuta un comando aggregate que define una variable year en el comando aggregate let y añade el año a cakeSales usando whenMatched

  • retrieves the cakeSales document

Salida:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }

Puede definir variables en la $merge etapa y, a partir de MongoDB 5.0, el comando aggregate.

Si se definen dos variables con el mismo nombre en la etapa $merge y el comando aggregate, se utiliza la variable de la etapa $merge.

En este ejemplo, la variable de etapa year: "2020" $merge se utiliza en lugar de la variable de comando year: "2019" aggregate:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2019" }
} )
db.cakeSales.find()

Salida:

{
_id: 1,
flavor: 'chocolate',
salesTotal: 1580,
salesTrend: 'up',
salesYear: '2020'
}

Los ejemplos de C# en esta página utilizan la base de datos sample_mflix de los conjuntos de datos de muestra de Atlas. Para aprender a crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulta Primeros pasos en la documentación del controlador de MongoDB .NET/C#.

La siguiente clase Movie modela los documentos en la colección sample_mflix.movies:

public class Movie
{
public ObjectId Id { get; set; }
public int Runtime { get; set; }
public string Title { get; set; }
public string Rated { get; set; }
public List<string> Genres { get; set; }
public string Plot { get; set; }
public ImdbData Imdb { get; set; }
public int Year { get; set; }
public int Index { get; set; }
public string[] Comments { get; set; }
[BsonElement("lastupdated")]
public DateTime LastUpdated { get; set; }
}

Nota

ConventionPack para Pascal Case

Las clases de C# en esta página utilizan Pascal case para los nombres de sus propiedades, pero los nombres de los campos en la colección de MongoDB utilizan camel case. Para tener en cuenta esta diferencia, se puede usar el siguiente código para registrar un ConventionPack cuando la aplicación se inicie:

var camelCaseConvention = new ConventionPack { new CamelCaseElementNameConvention() };
ConventionRegistry.Register("CamelCase", camelCaseConvention, type => true);

Para usar el driver de MongoDB .NET/C# para agregar una etapa $merge a un pipeline de agregación, llama al método Merge() en un objeto PipelineDefinition.

Cuando se llame al método Merge(), se debe pasar una instancia de la clase MergeStageOptions. Este objeto permite especificar opciones para la etapa $merge, como la forma de gestionar documentos coincidentes.

El siguiente ejemplo crea una etapa del pipeline que fusiona los documentos del pipeline en la colección movies. El objeto MergeStageOptions especifica las siguientes opciones:

  • La opción OnFieldNames especifica que la operación debe usar los campos "id" y "title" para encontrar documentos coincidentes en la colección de origen y en la colección movies.

  • La opción WhenMatched especifica que si un documento en la colección fuente coincide con un documento en la colección movies, debe reemplazar el documento en la colección movies.

  • La opción WhenNotMatched especifica que si un documento en la colección fuente no coincide con un documento en la colección movies, debe insertarse en la colección movies.

var movieCollection = client
.GetDatabase("sample_mflix")
.GetCollection<Movie>("movies");
var pipeline = new EmptyPipelineDefinition<Movie>()
.Merge(movieCollection,
new MergeStageOptions<Movie>(
{
OnFieldNames = new List<string>() {"id", "title"},
WhenMatched = MergeStageWhenMatched.Replace,
WhenNotMatched = MergeStageWhenNotMatched.Insert,
});

Los ejemplos de Node.js en esta página utilizan la base de datos sample_mflix de los conjuntos de datos de muestra de Atlas. Para aprender a crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte Primeros pasos en la documentación del controlador de MongoDB Node.js.

Para utilizar el controlador de MongoDB Node.js para agregar una etapa de $merge a una canalización de agregación, utilice el Operador $merge en un objeto de canalización.

El siguiente ejemplo crea una etapa de pipeline que fusiona los documentos del pipeline en la colección movies. El ejemplo incluye los siguientes campos:

  • La opción on especifica que la operación debe usar los campos "_id" y "title" para encontrar documentos coincidentes en la colección de origen y en la colección movies.

  • La opción whenMatched especifica que si un documento en la colección fuente coincide con un documento en la colección movies, reemplaza el documento en la colección movies.

  • La opción whenNotMatched especifica que si un documento en la colección de origen no coincide con un documento en la colección movies, la operación inserta el documento en la colección movies.

A continuación, el ejemplo ejecuta la canalización de agregación:

const pipeline = [
{
$merge: {
into: "movies",
on: ["_id", "title"],
whenMatched: "replace",
whenNotMatched: "insert"
}
}
];
const cursor = collection.aggregate(pipeline);
return cursor;

Volver

Coincidencia

En esta página