Docs Menu
Docs Home
/ /

db.colección.watch() (método mongosh)

Importante

Método mongosh

Esta página documenta una Método mongosh. Esta no es la documentación para comandos de base de datos ni para controladores específicos del lenguaje, como Node.js.

Para el comando de base de datos, consulte el aggregate Comando con la $changeStream etapa de agregación.

Para los drivers de API de MongoDB, consulte la documentación del driver de MongoDB específica del lenguaje.

db.collection.watch( pipeline, options )

Solo para Sets de réplicas y clústeres fragmentados

Abre un cursor de flujo de cambios en la colección.

Devuelve:Un cursor que permanece abierto mientras una conexión con la implementación de MongoDB esté abierta y la colección exista. Consultar Eventos de Cambio para ejemplos de documentos de eventos de cambio.
Parameter
Tipo
Descripción

pipeline

arreglo

Opcional. Una canalización de agregación que consiste en una o más de las siguientes etapas de agregación:

Especifica un pipeline para filtrar o modificar la salida de los eventos de cambio.

A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.

options

Documento

Opcional. Opciones adicionales que modifican el comportamiento de watch().

El documento options puede contener los siguientes campos y valores:

Campo
Tipo
Descripción

resumeAfter

Documento

Opcional. Indica a watch() que intente reanudar las notificaciones comenzando después de la operación especificada en el token de reanudación.

Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo _id. Pasa todo el campo _id del documento de evento de cambio que representa la operación que deseas reanudar después.

resumeAfter es mutuamente excluyente con startAfter y startAtOperationTime.

startAfter

Documento

Opcional. Indica a watch() que intente iniciar un nuevo flujo de cambios después de la operación especificada en el token de reanudación. Permite que las notificaciones se reanuden después de un evento de invalidación.

Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo _id. Pasa todo el campo _id del documento de evento de cambio que representa la operación que deseas reanudar después.

startAfter es mutuamente excluyente con resumeAfter y startAtOperationTime.

fullDocument

string

Opcional. Por defecto, watch() devuelve el delta de los campos que han sido modificados por una operación de actualización, en lugar de todo el documento actualizado.

Configura fullDocument en "updateLookup" para dirigir a watch() a buscar la versión más reciente del documento actualizado que haya sido comprometida por la mayoría. watch() devuelve un campo fullDocument con la búsqueda de documentos además del delta de updateDescription.

A partir de MongoDB 6.0, puede configurar fullDocument para:

  • "whenAvailable" para mostrar el documento después de la imagen, si está disponible, después de insertar, reemplazar o actualizar el documento.

  • "required" para generar la imagen del documento después de que se haya insertado, reemplazado o actualizado. Genera un error si la imagen de publicación no está disponible.

fullDocumentBeforeChange

string

Opcional.

A partir de MongoDB 6.0, puede usar el nuevo campo fullDocumentBeforeChange y configurarlo en:

  • "whenAvailable" para mostrar la imagen anterior del documento, si está disponible, antes de que el documento sea reemplazado, actualizado o borrado.

  • "required" para mostrar la imagen previa del documento antes de que el documento fuera reemplazado, actualizado o borrado. Genera un error si la pre-imagen no está disponible.

  • "off" para suprimir la imagen anterior del documento. "off" es el valor por defecto.

batchSize

Int

Opcional. El número máximo de documentos que se pueden devolver en cada agrupación de un flujo de cambios. Por defecto, watch() tiene un tamaño de agrupación inicial de la menor de 101 documentos o 16 mebibytes (MiB) de documentos. Las agrupaciones posteriores tienen un tamaño máximo de 16 mebibytes. Esta opción puede aplicar un límite inferior a 16 MiB, pero no uno superior. Cuando se establece, batchSize es el menor de batchSize documentos o 16 MiB de documentos.

Tiene la misma funcionalidad que cursor.batchSize().

maxAwaitTimeMS

Int

Opcional. El tiempo máximo en milisegundos que el servidor espera para que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver una agrupación vacía.

Se establece por defecto en 1000 milisegundos.

collation

Documento

Opcional. Pasa un documento de intercalación para especificar una intercalación para el cursor del flujo de cambios.

Se utiliza por defecto la comparación binaria simple si se omite.

showExpandedEvents

booleano

Opcional. A partir de MongoDB 6.0, los flujos de cambios admiten notificaciones de cambios para eventos DDL, como los eventos createIndexes y dropIndexes. Para incluir eventos expandidos en un flujo de cambios, crea el cursor del flujo de cambios utilizando la opción showExpandedEvents.

Novedades en la versión 6.0.

startAtOperationTime

Marca de tiempo

Opcional. El punto de inicio para el flujo de cambios. Si el punto de inicio especificado está en el pasado, debe encontrarse dentro del rango de tiempo del oplog. Para comprobar el rango temporal del oplog, consulta rs.printReplicationInfo().

startAtOperationTime es mutuamente excluyente con resumeAfter y startAfter.

Tip

Este método está disponible en implementaciones alojadas en los siguientes entornos:

  • MongoDB Atlas: El servicio totalmente gestionado para implementaciones de MongoDB en la nube

Nota

Este comando es compatible con todos los clústeres de MongoDB Atlas. Para obtener información sobre el soporte de Atlas para todos los comandos, consulte Comandos no compatibles.

  • MongoDB Enterprise: La versión basada en suscripción y autogestionada de MongoDB

  • MongoDB Community: La versión de MongoDB con código fuente disponible, de uso gratuito y autogestionada.

db.collection.watch() está disponible para implementaciones de Sets de réplicas y clústeres:

Solo se puede usar db.collection.watch() con el motor de almacenamiento WiredTiger.

Los flujos de cambios están disponibles independientemente del soporte de "majority" de nivel de consistencia de lectura; es decir, el soporte de nivel de consistencia de lectura majority puede estar habilitado (por defecto) o deshabilitado para usar flujos de cambios.

  • db.collection.watch() solo notifica a la mayoría de los miembros portadores de datos sobre cambios en los datos que han persistido.

  • El cursor de flujo de cambios permanece abierto hasta que ocurra una de las siguientes situaciones:

    • El cursor se cierra explícitamente.

    • Se produce un evento de invalidación; por ejemplo, un descarte o cambio de nombre de una colección.

    • La conexión a la implementación de MongoDB se cierra o se agota el tiempo de espera. Consulta Comportamiento para obtener más información.

    • Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.

A diferencia de los controladores de MongoDB, mongosh no intenta automáticamente reanudar un cursor de flujo de cambios después de un error. Los controladores de MongoDB hacen un intento de reanudar automáticamente un cursor de flujo de cambios después de ciertos errores.

db.collection.watch() utiliza la información almacenada en el oplog para generar la descripción del evento de cambio y crear un token de reanudación asociado a esa operación. Si la operación identificada por el token de reanudación pasado a la opción resumeAfter o startAfter ya ha salido del oplog, db.collection.watch() no puede reanudar el flujo de cambios.

Consulte Reanudar un flujo de cambios para obtener más información sobre cómo reanudar un flujo de cambios.

Nota

  • No se puede usar resumeAfter para reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.

  • Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.

Nota

No se puede usar resumeAfter para reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.

Por defecto, el cursor de flujo de cambios devuelve cambios/deltas específicos de campo para las operaciones de actualización. También puedes configurar el flujo de cambios para buscar y devolver la versión actual del documento modificado que ha sido comprometida por la mayoría. Dependiendo de otras operaciones de guardado que puedan haber ocurrido entre la actualización y la búsqueda, el documento devuelto puede diferir significativamente del documento en el momento de la actualización.

Dependiendo del número de cambios aplicados durante la operación de actualización y del tamaño del documento completo, existe el riesgo de que el tamaño del documento de evento de cambio para una operación de actualización sea mayor que el límite de 16 MB para documentos BSON. Si esto ocurre, el servidor cierra el cursor del flujo de cambios y devuelve un error.

Cuando se ejecuta con control de acceso, el usuario debe tener los privilegios de acción find y changeStream sobre el recurso de colección. Es decir, un usuario debe tener un rol que le conceda el siguiente privilegio:

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }

El rol read incorporado proporciona los privilegios apropiados.

MongoDB ofrece múltiples maneras de iterar sobre un cursor.

El cursor.hasNext() método se bloquea y espera el siguiente evento. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza hasNext() de esta manera:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

El cursor.tryNext() método es no bloqueante. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza tryNext() de esta manera:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

La siguiente operación abre un cursor de flujo de cambios en la colección data.sensors:

watchCursor = db.getSiblingDB("data").sensors.watch()

Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.

Nota

No puedes usar isExhausted() con flujos de cambios.

Advertencia

En situaciones que implican eliminaciones rápidas o picos de tráfico, configurar fullDocument: "updateLookup" con un $match filtro puede provocar errores de "Token de currículum no encontrado". Esto ocurre cuando la eliminación de un documento hace que el fullDocument campo devuelva un valor nulo, ya que no hay ningún documento coincidente, lo que impide que el flujo de cambios encuentre el token de currículum.

En su lugar, utilice imágenes previas y posteriores con fullDocumentBeforeChange: "whenAvailable" y. Consulte fullDocument: "whenAvailable" la sección "Flujos de cambio con imágenes previas y posteriores del documento".

Configurar la opción fullDocument en "updateLookup" para dirigir el cursor del flujo de cambios a buscar la versión más reciente comprometida por mayoría del documento asociado a un evento de actualización del flujo de cambios.

La siguiente operación abre un cursor de flujo de cambios contra la colección data.sensors usando la opción fullDocument : "updateLookup".

watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)

Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

Para cualquier operación de actualizar, el evento de cambio devuelve el resultado de la búsqueda del documento en el campo fullDocument.

Para un ejemplo del resultado completo de la actualización del documento, vea evento de actualización del flujo de cambios.

Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.

A partir de MongoDB 6.0, puedes utilizar eventos de flujo de cambios para generar la versión de un documento antes y después de los cambios (las imágenes previas y posteriores del documento):

  • La imagen previa es el documento antes de reemplazarlo, actualizarlo o borrarlo. No existe una imagen previa para un documento insertado.

  • La imagen posterior es el documento tras insertarse, sustituirse o actualizarse. No hay imagen posterior para un documento borrado.

  • Activa changeStreamPreAndPostImages para una colección con db.createCollection(), create o collMod. Por ejemplo, cuando se utiliza el comando collMod:

    db.runCommand( {
    collMod: <collection>,
    changeStreamPreAndPostImages: { enabled: true }
    } )

Las imágenes previas y posteriores no están disponibles para un evento de flujo de cambios si las imágenes fueron:

  • No está habilitado en la colección en el momento de la operación de actualización o eliminación de un documento.

  • Eliminado después del tiempo de retención de imágenes previas y posteriores establecido en expireAfterSeconds.

    • El siguiente ejemplo configura expireAfterSeconds en 100 segundos en todo el clúster:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )

      Nota

      El comando setClusterParameter no es compatible con los clústeres de MongoDB Atlas. Para obtener información sobre el soporte de Atlas para todos los comandos, consulta Comandos no compatibles en Atlas.

    • El siguiente ejemplo devuelve la configuración actual de changeStreamOptions, incluyendo expireAfterSeconds:

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • Configurar expireAfterSeconds en off utiliza la política de retención por defecto: las imágenes previas y posteriores se conservan hasta que los eventos correspondientes del flujo de cambios se eliminan del oplog.

    • Si se elimina un evento de flujo de cambios del oplog, las imágenes previas y posteriores correspondientes también se eliminan independientemente del tiempo de retención de las imágenes previas y posteriores expireAfterSeconds.

Consideraciones adicionales:

  • Habilitar las imágenes previas y posteriores consume espacio de almacenamiento y aumenta el tiempo de procesamiento. Habilita solo las imágenes previas y de publicación si las necesitas.

  • Limita el tamaño del evento del flujo de cambios a menos de 16 mebibytes. Para limitar el tamaño del evento, puedes:

    • Limita el tamaño del documento a 8 megabytes. Puedes solicitar imágenes previas y posteriores simultáneamente en la salida del flujo de cambios si otros campos de eventos del flujo de cambios como updateDescription no son grandes.

    • Solicita solo imágenes posteriores en la salida del flujo de cambios para documentos de hasta 16 mebibytes si otros campos de eventos del flujo de cambios como updateDescription no son grandes.

    • Solicita solo imágenes previas en la salida del flujo de cambios para documentos de hasta 16 mebibytes si:

      • las actualizaciones de documento afectan solo a una pequeña fracción de la estructura o el contenido del documento, y

      • no cause un evento de cambio replace. Un evento replace siempre incluye la imagen de publicación.

  • Para solicitud de una pre-imagen, configura fullDocumentBeforeChange en required o whenAvailable en db.collection.watch(). Para solicitar una imagen de publicación, configura fullDocument usando el mismo método.

  • Las imágenes previas se escriben en la colección config.system.preimages.

    • La colección config.system.preimages puede agrandarse. Para limitar el tamaño de la colección, puedes establecer el tiempo a expireAfterSeconds para las imágenes previas como se mostró antes.

    • Las imágenes previas se eliminan de forma asincrónica mediante un proceso en segundo plano.

Importante

Característica incompatible con versiones anteriores

A partir de MongoDB 6.0, si utilizas imágenes previas y posteriores de documentos para los flujos de cambios, debes deshabilitar changeStreamPreAndPostImages para cada colección mediante el comando collMod antes de poder volver a una versión anterior de MongoDB.

Tip

Crear una colección temperatureSensor que tenga changeStreamPreAndPostImages habilitada:

db.createCollection(
"temperatureSensor",
{ changeStreamPreAndPostImages: { enabled: true } }
)

Rellenar la colección temperatureSensor con lecturas de temperatura:

db.temperatureSensor.insertMany( [
{ "_id" : 0, "reading" : 26.1 },
{ "_id" : 1, "reading" : 25.9 },
{ "_id" : 2, "reading" : 24.3 },
{ "_id" : 3, "reading" : 22.4 },
{ "_id" : 4, "reading" : 24.6 }
] )

Las siguientes secciones muestran ejemplos de flujos de cambios para imágenes previas y posteriores de documentos que utilizan la colección temperatureSensor.

Utilizas la configuración fullDocumentBeforeChange: "whenAvailable" para generar la imagen anterior del documento, si está disponible. La imagen anterior es el documento antes de ser reemplazado, actualizado o borrado. No existe una imagen anterior para un documento insertado.

El siguiente ejemplo crea un cursor de flujo de cambios para la colección temperatureSensor utilizando fullDocumentBeforeChange: "whenAvailable":

watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch(
[],
{ fullDocumentBeforeChange: "whenAvailable" }
)

El siguiente ejemplo utiliza el cursor para verificar nuevos eventos de flujo de cambios:

while ( !watchCursorFullDocumentBeforeChange.isClosed() ) {
if ( watchCursorFullDocumentBeforeChange.hasNext() ) {
printjson( watchCursorFullDocumentBeforeChange.next() );
}
}

En el ejemplo:

  • El bucle while se ejecuta hasta que el cursor se cierra.

  • hasNext() devuelve true si el cursor contiene documentos.

El siguiente ejemplo actualiza el campo reading de un documento temperatureSensor:

db.temperatureSensor.updateOne(
{ _id: 2 },
{ $set: { reading: 22.1 } }
)

Después de que se actualice el documento temperatureSensor, el evento de cambio genera la imagen anterior del documento en el campo fullDocumentBeforeChange. La imagen anterior contiene el campo temperatureSensor del documento reading antes de que se actualizara. Por ejemplo:

{
"_id" : {
"_data" : "82624B21...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1649090957, 1),
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 2
},
"updateDescription" : {
"updatedFields" : {
"reading" : 22.1
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
},
"fullDocumentBeforeChange" : {
"_id" : 2,
"reading" : 24.3
}
}

Tip

Utiliza la configuración fullDocument: "whenAvailable" para generar la imagen posterior del documento, si está disponible. La imagen posterior es el documento después de ser insertado, reemplazado o actualizado. No hay imagen de publicación para un documento borrado.

El siguiente ejemplo crea un cursor de flujo de cambios para la colección temperatureSensor utilizando fullDocument: "whenAvailable":

watchCursorFullDocument = db.temperatureSensor.watch(
[],
{ fullDocument: "whenAvailable" }
)

El siguiente ejemplo utiliza el cursor para verificar nuevos eventos de flujo de cambios:

while ( !watchCursorFullDocument.isClosed() ) {
if ( watchCursorFullDocument.hasNext() ) {
printjson( watchCursorFullDocument.next() );
}
}

En el ejemplo:

  • El bucle while se ejecuta hasta que el cursor se cierra.

  • hasNext() devuelve true si el cursor contiene documentos.

El siguiente ejemplo actualiza el campo reading de un documento temperatureSensor:

db.temperatureSensor.updateOne(
{ _id: 1 },
{ $set: { reading: 29.5 } }
)

Después de que se actualice el documento temperatureSensor, el evento de cambio genera la imagen de publicación del documento en el campo fullDocument. La imagen posterior contiene el campo temperatureSensor del documento reading después de que fue actualizado. Por ejemplo:

{
"_id" : {
"_data" : "8262474D...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1648840090, 1),
"fullDocument" : {
"_id" : 1,
"reading" : 29.5
},
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 1
},
"updateDescription" : {
"updatedFields" : {
"reading" : 29.5
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
}
}

Tip

Nota

A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.

La siguiente operación abre un cursor de flujo de cambios en la colección data.sensors utilizando una canalización de agregación para filtrar solo los eventos insert:

watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)

Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.hasNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:

while (!watchCursor.isClosed()){
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}

El cursor de flujo de cambios solo devuelve eventos de cambio donde el operationType es insert. Para obtener la documentación completa sobre la salida del flujo de cambios, consulta Eventos de cambio.

Cada documento devuelto por un cursor de flujo de cambios incluye un token de reanudación como el campo _id. Para reanudar un flujo de cambios, pase el documento _id completo del evento de cambio del que desea reanudar a la opción resumeAfter o startAfter de watch().

La siguiente operación reanuda un cursor de flujo de cambios en la colección data.sensors utilizando un token de reanudación. Esto asume que la operación que generó el token de reanudación no se ha eliminado del oplog del clúster.

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
watchCursor.close();
let resumeToken = firstChange._id;
resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)

Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.hasNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:

while (!resumedWatchCursor.isClosed()){
if (resumedWatchCursor.hasNext()){
print(resumedWatchCursor.next());
}
}

Consulta Reanudar un flujo de cambios para obtener documentación completa sobre cómo reanudar un flujo de cambios.

Volver

db.collection.validate