Definición
db.collection.watch( pipeline, options )Importante
Método mongosh
Esta página documenta un método
mongosh. Esta no es la documentación para los comandos de base de datos ni para los drivers específicos de lenguajes, como Nodo.js.Para el comando de base de datos, consulte el
aggregateComando con la$changeStreametapa de agregación.Para los drivers de API de MongoDB, consulte la documentación del driver de MongoDB específica del lenguaje.
Solo para Sets de réplicas y clústeres fragmentados
Abre un cursor de flujo de cambios en la colección.
ParameterTipoDescripciónpipelinearreglo
Opcional. Una canalización de agregación que consiste en una o más de las siguientes etapas de agregación:
Especifica un pipeline para filtrar o modificar la salida de los eventos de cambio.
A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.
optionsDocumento
Opcional. Opciones adicionales que modifican el comportamiento de
watch().El documento
optionspuede contener los siguientes campos y valores:CampoTipoDescripciónresumeAfterDocumento
Opcional. Indica a
watch()que intente reanudar las notificaciones comenzando después de la operación especificada en el token de reanudación.Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo
_id. Pasa todo el campo_iddel documento de evento de cambio que representa la operación que deseas reanudar después.resumeAfteres mutuamente excluyente constartAfterystartAtOperationTime.startAfterDocumento
Opcional. Indica a
watch()que intente iniciar un nuevo flujo de cambios después de la operación especificada en el token de reanudación. Permite que las notificaciones se reanuden después de un evento de invalidación.Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo
_id. Pasa todo el campo_iddel documento de evento de cambio que representa la operación que deseas reanudar después.startAfteres mutuamente excluyente conresumeAfterystartAtOperationTime.fullDocumentstring
Opcional. Por defecto,
watch()devuelve el delta de los campos que han sido modificados por una operación de actualización, en lugar de todo el documento actualizado.Configura
fullDocumenten"updateLookup"para dirigir awatch()a buscar la versión más reciente del documento actualizado que haya sido comprometida por la mayoría.watch()devuelve un campofullDocumentcon la búsqueda de documentos además del delta deupdateDescription.A partir de MongoDB 6.0, puede configurar
fullDocumentpara:"whenAvailable"para mostrar el documento después de la imagen, si está disponible, después de insertar, reemplazar o actualizar el documento."required"para generar la imagen del documento después de que se haya insertado, reemplazado o actualizado. Genera un error si la imagen de publicación no está disponible.
fullDocumentBeforeChangestring
Opcional.
A partir de MongoDB 6.0, puede usar el nuevo campo
fullDocumentBeforeChangey configurarlo en:"whenAvailable"para mostrar la imagen anterior del documento, si está disponible, antes de que el documento sea reemplazado, actualizado o borrado."required"para mostrar la imagen previa del documento antes de que el documento fuera reemplazado, actualizado o borrado. Genera un error si la pre-imagen no está disponible."off"para suprimir la imagen anterior del documento."off"es el valor por defecto.
batchSizeInt
Opcional. El número máximo de documentos que se pueden devolver en cada agrupación de un flujo de cambios. Por defecto,
watch()tiene un tamaño de agrupación inicial de la menor de101documentos o 16 mebibytes (MiB) de documentos. Las agrupaciones posteriores tienen un tamaño máximo de 16 mebibytes. Esta opción puede aplicar un límite inferior a 16 MiB, pero no uno superior. Cuando se establece,batchSizees el menor debatchSizedocumentos o 16 MiB de documentos.Tiene la misma funcionalidad que
cursor.batchSize().maxAwaitTimeMSInt
Opcional. El tiempo máximo en milisegundos que el servidor espera para que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver una agrupación vacía.
Se establece por defecto en
1000milisegundos.collationDocumento
Opcional. Pase un documento de intercalación para especificar una intercalación para el cursor del flujo de cambios.
Se utiliza por defecto la comparación binaria
simplesi se omite.showExpandedEventsbooleano
Opcional. A partir de MongoDB 6.0, los flujos de cambios admiten notificaciones de cambios para eventos DDL, como los eventos createIndexes y dropIndexes. Para incluir eventos expandidos en un flujo de cambios, crea el cursor del flujo de cambios utilizando la opción
showExpandedEvents.Novedades en la versión 6.0.
startAtOperationTimeMarca de tiempo
Opcional. El punto de inicio para el flujo de cambios. Si el punto de inicio especificado está en el pasado, debe encontrarse dentro del rango de tiempo del oplog. Para comprobar el rango temporal del oplog, consulta
rs.printReplicationInfo().startAtOperationTimees mutuamente excluyente conresumeAfterystartAfter.Devuelve: Un cursor que permanece abierto mientras una conexión con la implementación de MongoDB esté abierta y la colección exista. Consultar Eventos de Cambio para ejemplos de documentos de eventos de cambio. Tip
Compatibilidad
Este método está disponible en implementaciones alojadas en los siguientes entornos:
MongoDB Atlas: El servicio totalmente gestionado para implementaciones de MongoDB en la nube
Nota
Este comando es compatible con todos los clústeres de MongoDB Atlas. Para obtener información sobre el soporte de Atlas para todos los comandos, consulte Comandos no compatibles.
MongoDB Enterprise: La versión basada en suscripción y autogestionada de MongoDB
MongoDB Community: La versión de MongoDB con código fuente disponible, de uso gratuito y autogestionada.
Disponibilidad
Implementación
db.collection.watch() está disponible para implementaciones de Sets de réplicas y clústeres:
Para un set de réplicas, se puede emitir
db.collection.watch()en cualquier Nodo que contenga datos.Para un clúster particionado, se debe emitir
db.collection.watch()en una instanciamongos.
Motor de almacenamiento
Solo se puede usar db.collection.watch() con el motor de almacenamiento WiredTiger.
Leer Preocupación majority Soporte
A partir de MongoDB,4.2 losflujos de cambio están disponibles independientemente del "majority" soporte de lectura de preocupación; es decir, el soporte de lectura de majority preocupación puede estar habilitado (predeterminado) o deshabilitado para usar flujos de cambio.
En MongoDB 4.0 y versiones anteriores, losflujos de cambio solo están disponibles si "majority" el soporte de lectura de inquietudes está habilitado (predeterminado).
Comportamiento
db.collection.watch()solo notifica a la mayoría de los miembros portadores de datos sobre cambios en los datos que han persistido.El cursor de flujo de cambios permanece abierto hasta que ocurra una de las siguientes situaciones:
El cursor se cierra explícitamente.
Se produce un evento de invalidación; por ejemplo, un descarte o cambio de nombre de una colección.
La conexión a la implementación de MongoDB se cierra o se agota el tiempo de espera. Consulta Comportamiento para obtener más información.
Si la implementación es un clúster fragmentado, la eliminación de un fragmento puede provocar que se cierre un cursor de flujo de cambio abierto y que el cursor de flujo de cambio cerrado no pueda reanudarse por completo.
Reanudabilidad
A diferencia de los controladores de MongoDB, mongosh no intenta automáticamente reanudar un cursor de flujo de cambios después de un error. Los controladores de MongoDB hacen un intento de reanudar automáticamente un cursor de flujo de cambios después de ciertos errores.
db.collection.watch() utiliza la información almacenada en el oplog para generar la descripción del evento de cambio y crear un token de reanudación asociado a esa operación. Si la operación identificada por el token de reanudación pasado a la opción resumeAfter o startAfter ya ha salido del oplog, db.collection.watch() no puede reanudar el flujo de cambios.
Consulte Reanudar un flujo de cambios para obtener más información sobre cómo reanudar un flujo de cambios.
Nota
No se puede usar
resumeAfterpara reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.Si la implementación es un clúster fragmentado, la eliminación de un fragmento puede provocar que se cierre un cursor de flujo de cambio abierto y que el cursor de flujo de cambio cerrado no pueda reanudarse por completo.
Nota
No se puede usar resumeAfter para reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.
Consulta completa de documentos de operaciones de actualización
Por defecto, el cursor de flujo de cambios devuelve cambios/deltas específicos de campo para las operaciones de actualización. También puedes configurar el flujo de cambios para buscar y devolver la versión actual del documento modificado que ha sido comprometida por la mayoría. Dependiendo de otras operaciones de guardado que puedan haber ocurrido entre la actualización y la búsqueda, el documento devuelto puede diferir significativamente del documento en el momento de la actualización.
Dependiendo del número de cambios aplicados durante la operación de actualización y del tamaño del documento completo, existe el riesgo de que el tamaño del documento de evento de cambio para una operación de actualización sea mayor que el límite de 16 MB para documentos BSON. Si esto ocurre, el servidor cierra el cursor del flujo de cambios y devuelve un error.
Control de acceso
Cuando se ejecuta con control de acceso, el usuario debe tener los privilegios de acción find y changeStream sobre el recurso de colección. Es decir, un usuario debe tener un rol que le conceda el siguiente privilegio:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
El rol read incorporado proporciona los privilegios apropiados.
Iteración de cursor
MongoDB ofrece múltiples maneras de iterar sobre un cursor.
El cursor.hasNext() método se bloquea y espera el siguiente evento. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza hasNext() de esta manera:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
El cursor.tryNext() método es no bloqueante. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza tryNext() de esta manera:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Ejemplos
Abre un flujo de cambios
La siguiente operación abre un cursor de flujo de cambios en la colección data.sensors:
watchCursor = db.getSiblingDB("data").sensors.watch()
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.
Nota
No puedes usar isExhausted() con flujos de cambios.
Flujo de cambios con búsqueda de actualización de documentos completos
Configurar la opción fullDocument en "updateLookup" para dirigir el cursor del flujo de cambios a buscar la versión más reciente comprometida por mayoría del documento asociado a un evento de actualización del flujo de cambios.
La siguiente operación abre un cursor de flujo de cambios contra la colección data.sensors usando la opción fullDocument : "updateLookup".
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para cualquier operación de actualizar, el evento de cambio devuelve el resultado de la búsqueda del documento en el campo fullDocument.
Para un ejemplo del resultado completo de la actualización del documento, vea evento de actualización del flujo de cambios.
Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.
Change Streams con imágenes previas y posteriores de los documentos
A partir de MongoDB 6.0, puedes utilizar eventos de flujo de cambios para generar la versión de un documento antes y después de los cambios (las imágenes previas y posteriores del documento):
La imagen previa es el documento antes de reemplazarlo, actualizarlo o borrarlo. No existe una imagen previa para un documento insertado.
La imagen posterior es el documento tras insertarse, sustituirse o actualizarse. No hay imagen posterior para un documento borrado.
Activa
changeStreamPreAndPostImagespara una colección condb.createCollection(),createocollMod.
Las imágenes previas y posteriores no están disponibles para un evento de flujo de cambios si las imágenes fueron:
No está habilitado en la colección en el momento de la operación de actualización o eliminación de un documento.
Eliminado después del tiempo de retención de imágenes previas y posteriores establecido en
expireAfterSeconds.El siguiente ejemplo configura
expireAfterSecondsen100segundos en todo el clúster:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) El siguiente ejemplo devuelve la configuración actual de
changeStreamOptions, incluyendoexpireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Configurar
expireAfterSecondsenoffutiliza la política de retención por defecto: las imágenes previas y posteriores se conservan hasta que los eventos correspondientes del flujo de cambios se eliminan del oplog.Si se elimina un evento de flujo de cambios del oplog, las imágenes previas y posteriores correspondientes también se eliminan independientemente del tiempo de retención de las imágenes previas y posteriores
expireAfterSeconds.
Consideraciones adicionales:
Habilitar las imágenes previas y posteriores consume espacio de almacenamiento y aumenta el tiempo de procesamiento. Habilita solo las imágenes previas y de publicación si las necesitas.
Limita el tamaño del evento del flujo de cambios a menos de 16 mebibytes. Para limitar el tamaño del evento, puedes:
Limita el tamaño del documento a 8 megabytes. Puedes solicitar imágenes previas y posteriores simultáneamente en la salida del flujo de cambios si otros campos de eventos del flujo de cambios como
updateDescriptionno son grandes.Solicita solo imágenes posteriores en la salida del flujo de cambios para documentos de hasta 16 mebibytes si otros campos de eventos del flujo de cambios como
updateDescriptionno son grandes.Solicita solo imágenes previas en la salida del flujo de cambios para documentos de hasta 16 mebibytes si:
las actualizaciones de documento afectan solo a una pequeña fracción de la estructura o el contenido del documento, y
no cause un evento de cambio
replace. Un eventoreplacesiempre incluye la imagen de publicación.
Para solicitud de una pre-imagen, configura
fullDocumentBeforeChangeenrequiredowhenAvailableendb.collection.watch(). Para solicitar una imagen de publicación, configurafullDocumentusando el mismo método.Las preimágenes se escriben en la colección
config.system.preimages.La colección
config.system.preimagespuede agrandarse. Para limitar el tamaño de la colección, puedes establecer el tiempo aexpireAfterSecondspara las imágenes previas como se mostró antes.Las imágenes previas se eliminan de forma asincrónica mediante un proceso en segundo plano.
Importante
Característica incompatible con versiones anteriores
A partir de MongoDB 6.0, si utilizas imágenes previas y posteriores de documentos para los flujos de cambios, debes deshabilitar changeStreamPreAndPostImages para cada colección mediante el comando collMod antes de poder volver a una versión anterior de MongoDB.
Tip
Para los eventos y resultados del flujo de cambios, consulta Eventos de cambio.
Para monitorear una colección en busca de cambios, consulta
db.collection.watch().Para ejemplos completos con la salida del flujo de cambios, consulta Change Streams con imágenes anteriores y posteriores de documentos.
Crear colección
Crear una colección temperatureSensor que tenga changeStreamPreAndPostImages habilitada:
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
Rellenar la colección temperatureSensor con lecturas de temperatura:
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
Las siguientes secciones muestran ejemplos de flujos de cambios para imágenes previas y posteriores de documentos que utilizan la colección temperatureSensor.
Flujo de cambios con imagen anterior del documento
Utilizas la configuración fullDocumentBeforeChange: "whenAvailable" para generar la imagen anterior del documento, si está disponible. La imagen anterior es el documento antes de ser reemplazado, actualizado o borrado. No existe una imagen anterior para un documento insertado.
El siguiente ejemplo crea un cursor de flujo de cambios para la colección temperatureSensor utilizando fullDocumentBeforeChange:
"whenAvailable":
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
El siguiente ejemplo utiliza el cursor para verificar nuevos eventos de flujo de cambios:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
En el ejemplo:
El bucle
whilese ejecuta hasta que el cursor se cierra.hasNext()devuelvetruesi el cursor contiene documentos.
El siguiente ejemplo actualiza el campo reading de un documento temperatureSensor:
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
Después de que se actualice el documento temperatureSensor, el evento de cambio genera la imagen anterior del documento en el campo fullDocumentBeforeChange. La imagen anterior contiene el campo temperatureSensor del documento reading antes de que se actualizara. Por ejemplo:
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
Tip
Para obtener detalles de salida de la actualización de documentos, consulte Eventos de actualización del flujo de cambios.
Para obtener detalles sobre la salida del flujo de cambios, consulta Eventos de Cambio.
Flujo de cambios con imagen después de la publicación del documento
Utiliza la configuración fullDocument: "whenAvailable" para generar la imagen posterior del documento, si está disponible. La imagen posterior es el documento después de ser insertado, reemplazado o actualizado. No hay imagen de publicación para un documento borrado.
El siguiente ejemplo crea un cursor de flujo de cambios para la colección temperatureSensor utilizando fullDocument:
"whenAvailable":
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
El siguiente ejemplo utiliza el cursor para verificar nuevos eventos de flujo de cambios:
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
En el ejemplo:
El bucle
whilese ejecuta hasta que el cursor se cierra.hasNext()devuelvetruesi el cursor contiene documentos.
El siguiente ejemplo actualiza el campo reading de un documento temperatureSensor:
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
Después de que se actualice el documento temperatureSensor, el evento de cambio genera la imagen de publicación del documento en el campo fullDocument. La imagen posterior contiene el campo temperatureSensor del documento reading después de que fue actualizado. Por ejemplo:
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
Para obtener detalles de salida de la actualización de documentos, consulte Eventos de actualización del flujo de cambios.
Para obtener detalles sobre la salida del flujo de cambios, consulta Eventos de Cambio.
Flujo de cambios con filtro de canalización de agregación
Nota
A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.
La siguiente operación abre un cursor de flujo de cambios en la colección data.sensors utilizando una canalización de agregación para filtrar solo los eventos insert:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.hasNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
El cursor de flujo de cambios solo devuelve eventos de cambio donde el operationType es insert. Para obtener la documentación completa sobre la salida del flujo de cambios, consulta Eventos de cambio.
Reanudar un flujo de cambios
Cada documento devuelto por un cursor de flujo de cambios incluye un token de reanudación como el campo _id. Para reanudar un flujo de cambios, pase el documento _id completo del evento de cambio del que desea reanudar a la opción resumeAfter o startAfter de watch().
La siguiente operación reanuda un cursor de flujo de cambios en la colección data.sensors utilizando un token de reanudación. Esto asume que la operación que generó el token de reanudación no se ha eliminado del oplog del clúster.
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.hasNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
Consulta Reanudar un flujo de cambios para obtener documentación completa sobre cómo reanudar un flujo de cambios.