Importante
Método mongosh
Esta página documenta una Método mongosh. Esta no es la documentación para comandos de base de datos ni para controladores específicos del lenguaje, como Node.js.
Para el comando de base de datos, consulte el aggregate Comando con la $changeStream etapa de agregación.
Para los drivers de API de MongoDB, consulte la documentación del driver de MongoDB específica del lenguaje.
Definición
db.collection.watch( pipeline, options )Solo para Sets de réplicas y clústeres fragmentados
Abre un cursor de flujo de cambios en la colección.
Devuelve: Un cursor que permanece abierto mientras una conexión con la implementación de MongoDB esté abierta y la colección exista. Consultar Eventos de Cambio para ejemplos de documentos de eventos de cambio.
Parámetros
Parameter | Tipo | Descripción |
|---|---|---|
| arreglo | Opcional. Una canalización de agregación que consiste en una o más de las siguientes etapas de agregación: Especifica un pipeline para filtrar o modificar la salida de los eventos de cambio. A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento. |
| Documento | Opcional. Opciones adicionales que modifican el comportamiento de |
Campos
El documento options puede contener los siguientes campos y valores:
Campo | Tipo | Descripción |
|---|---|---|
| Documento | Opcional. Indica a Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo
|
| Documento | Opcional. Indica a Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo
|
| string | Opcional. Por defecto, Configura A partir de MongoDB 6.0, puede configurar
|
| string | Opcional. A partir de MongoDB 6.0, puede usar el nuevo campo
|
| Int | Opcional. El número máximo de documentos que se pueden devolver en cada agrupación de un flujo de cambios. Por defecto, Tiene la misma funcionalidad que |
| Int | Opcional. El tiempo máximo en milisegundos que el servidor espera para que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver una agrupación vacía. Se establece por defecto en |
| Documento | Opcional. Pasa un documento de intercalación para especificar una intercalación para el cursor del flujo de cambios. Se utiliza por defecto la comparación binaria |
| booleano | Opcional. A partir de MongoDB 6.0, los flujos de cambios admiten notificaciones de cambios para eventos DDL, como los eventos createIndexes y dropIndexes. Para incluir eventos expandidos en un flujo de cambios, crea el cursor del flujo de cambios utilizando la opción Novedades en la versión 6.0. |
| Marca de tiempo | Opcional. El punto de inicio para el flujo de cambios. Si el punto de inicio especificado está en el pasado, debe encontrarse dentro del rango de tiempo del oplog. Para comprobar el rango temporal del oplog, consulta
|
Tip
Compatibilidad
Este método está disponible en implementaciones alojadas en los siguientes entornos:
MongoDB Atlas: El servicio totalmente gestionado para implementaciones de MongoDB en la nube
Nota
Este comando es compatible con todos los clústeres de MongoDB Atlas. Para obtener información sobre el soporte de Atlas para todos los comandos, consulte Comandos no compatibles.
MongoDB Enterprise: La versión basada en suscripción y autogestionada de MongoDB
MongoDB Community: La versión de MongoDB con código fuente disponible, de uso gratuito y autogestionada.
Disponibilidad
Implementación
db.collection.watch() está disponible para implementaciones de Sets de réplicas y clústeres:
Para un set de réplicas, se puede emitir
db.collection.watch()en cualquier Nodo que contenga datos.Para un clúster particionado, se debe emitir
db.collection.watch()en una instanciamongos.
Motor de almacenamiento
Solo se puede usar db.collection.watch() con el motor de almacenamiento WiredTiger.
Leer Preocupación majority Soporte
Los flujos de cambios están disponibles independientemente del soporte de "majority" de nivel de consistencia de lectura; es decir, el soporte de nivel de consistencia de lectura majority puede estar habilitado (por defecto) o deshabilitado para usar flujos de cambios.
Comportamiento
db.collection.watch()solo notifica a la mayoría de los miembros portadores de datos sobre cambios en los datos que han persistido.El cursor de flujo de cambios permanece abierto hasta que ocurra una de las siguientes situaciones:
El cursor se cierra explícitamente.
Se produce un evento de invalidación; por ejemplo, un descarte o cambio de nombre de una colección.
La conexión a la implementación de MongoDB se cierra o se agota el tiempo de espera. Consulta Comportamiento para obtener más información.
Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.
Reanudabilidad
A diferencia de los controladores de MongoDB, mongosh no intenta automáticamente reanudar un cursor de flujo de cambios después de un error. Los controladores de MongoDB hacen un intento de reanudar automáticamente un cursor de flujo de cambios después de ciertos errores.
db.collection.watch() utiliza la información almacenada en el oplog para generar la descripción del evento de cambio y crear un token de reanudación asociado a esa operación. Si la operación identificada por el token de reanudación pasado a la opción resumeAfter o startAfter ya ha salido del oplog, db.collection.watch() no puede reanudar el flujo de cambios.
Consulte Reanudar un flujo de cambios para obtener más información sobre cómo reanudar un flujo de cambios.
Nota
No se puede usar
resumeAfterpara reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.
Nota
No se puede usar resumeAfter para reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.
Consulta completa de documentos de operaciones de actualización
Por defecto, el cursor de flujo de cambios devuelve cambios/deltas específicos de campo para las operaciones de actualización. También puedes configurar el flujo de cambios para buscar y devolver la versión actual del documento modificado que ha sido comprometida por la mayoría. Dependiendo de otras operaciones de guardado que puedan haber ocurrido entre la actualización y la búsqueda, el documento devuelto puede diferir significativamente del documento en el momento de la actualización.
Dependiendo del número de cambios aplicados durante la operación de actualización y del tamaño del documento completo, existe el riesgo de que el tamaño del documento de evento de cambio para una operación de actualización sea mayor que el límite de 16 MB para documentos BSON. Si esto ocurre, el servidor cierra el cursor del flujo de cambios y devuelve un error.
Control de acceso
Cuando se ejecuta con control de acceso, el usuario debe tener los privilegios de acción find y changeStream sobre el recurso de colección. Es decir, un usuario debe tener un rol que le conceda el siguiente privilegio:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
El rol read incorporado proporciona los privilegios apropiados.
Iteración de cursor
MongoDB ofrece múltiples maneras de iterar sobre un cursor.
El cursor.hasNext() método se bloquea y espera el siguiente evento. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza hasNext() de esta manera:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
El cursor.tryNext() método es no bloqueante. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza tryNext() de esta manera:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Ejemplos
Abre un flujo de cambios
La siguiente operación abre un cursor de flujo de cambios en la colección data.sensors:
watchCursor = db.getSiblingDB("data").sensors.watch()
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.
Nota
No puedes usar isExhausted() con flujos de cambios.
Flujo de cambios con búsqueda de actualización de documentos completos
Advertencia
En situaciones que implican eliminaciones rápidas o picos de tráfico, configurar fullDocument: "updateLookup" con un $match filtro puede provocar errores de "Token de currículum no encontrado". Esto ocurre cuando la eliminación de un documento hace que el fullDocument campo devuelva un valor nulo, ya que no hay ningún documento coincidente, lo que impide que el flujo de cambios encuentre el token de currículum.
En su lugar, utilice imágenes previas y posteriores con fullDocumentBeforeChange:
"whenAvailable" y. Consulte fullDocument: "whenAvailable" la sección "Flujos de cambio con imágenes previas y posteriores del documento".
Configurar la opción fullDocument en "updateLookup" para dirigir el cursor del flujo de cambios a buscar la versión más reciente comprometida por mayoría del documento asociado a un evento de actualización del flujo de cambios.
La siguiente operación abre un cursor de flujo de cambios contra la colección data.sensors usando la opción fullDocument : "updateLookup".
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para cualquier operación de actualizar, el evento de cambio devuelve el resultado de la búsqueda del documento en el campo fullDocument.
Para un ejemplo del resultado completo de la actualización del documento, vea evento de actualización del flujo de cambios.
Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.
Change Streams con imágenes previas y posteriores de los documentos
A partir de MongoDB 6.0, puedes utilizar eventos de flujo de cambios para generar la versión de un documento antes y después de los cambios (las imágenes previas y posteriores del documento):
La imagen previa es el documento antes de reemplazarlo, actualizarlo o borrarlo. No existe una imagen previa para un documento insertado.
La imagen posterior es el documento tras insertarse, sustituirse o actualizarse. No hay imagen posterior para un documento borrado.
Activa
changeStreamPreAndPostImagespara una colección condb.createCollection(),createocollMod. Por ejemplo, cuando se utiliza el comandocollMod:db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: true } } )
Las imágenes previas y posteriores no están disponibles para un evento de flujo de cambios si las imágenes fueron:
No está habilitado en la colección en el momento de la operación de actualización o eliminación de un documento.
Eliminado después del tiempo de retención de imágenes previas y posteriores establecido en
expireAfterSeconds.El siguiente ejemplo configura
expireAfterSecondsen100segundos en todo el clúster:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) Nota
El comando
setClusterParameterno es compatible con los clústeres de MongoDB Atlas. Para obtener información sobre el soporte de Atlas para todos los comandos, consulta Comandos no compatibles en Atlas.El siguiente ejemplo devuelve la configuración actual de
changeStreamOptions, incluyendoexpireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Configurar
expireAfterSecondsenoffutiliza la política de retención por defecto: las imágenes previas y posteriores se conservan hasta que los eventos correspondientes del flujo de cambios se eliminan del oplog.Si se elimina un evento de flujo de cambios del oplog, las imágenes previas y posteriores correspondientes también se eliminan independientemente del tiempo de retención de las imágenes previas y posteriores
expireAfterSeconds.
Consideraciones adicionales:
Habilitar las imágenes previas y posteriores consume espacio de almacenamiento y aumenta el tiempo de procesamiento. Habilita solo las imágenes previas y de publicación si las necesitas.
Limita el tamaño del evento del flujo de cambios a menos de 16 mebibytes. Para limitar el tamaño del evento, puedes:
Limita el tamaño del documento a 8 megabytes. Puedes solicitar imágenes previas y posteriores simultáneamente en la salida del flujo de cambios si otros campos de eventos del flujo de cambios como
updateDescriptionno son grandes.Solicita solo imágenes posteriores en la salida del flujo de cambios para documentos de hasta 16 mebibytes si otros campos de eventos del flujo de cambios como
updateDescriptionno son grandes.Solicita solo imágenes previas en la salida del flujo de cambios para documentos de hasta 16 mebibytes si:
las actualizaciones de documento afectan solo a una pequeña fracción de la estructura o el contenido del documento, y
no cause un evento de cambio
replace. Un eventoreplacesiempre incluye la imagen de publicación.
Para solicitud de una pre-imagen, configura
fullDocumentBeforeChangeenrequiredowhenAvailableendb.collection.watch(). Para solicitar una imagen de publicación, configurafullDocumentusando el mismo método.Las imágenes previas se escriben en la colección
config.system.preimages.La colección
config.system.preimagespuede agrandarse. Para limitar el tamaño de la colección, puedes establecer el tiempo aexpireAfterSecondspara las imágenes previas como se mostró antes.Las imágenes previas se eliminan de forma asincrónica mediante un proceso en segundo plano.
Importante
Característica incompatible con versiones anteriores
A partir de MongoDB 6.0, si utilizas imágenes previas y posteriores de documentos para los flujos de cambios, debes deshabilitar changeStreamPreAndPostImages para cada colección mediante el comando collMod antes de poder volver a una versión anterior de MongoDB.
Tip
Para los eventos y resultados del flujo de cambios, consulta Eventos de cambio.
Para monitorear una colección en busca de cambios, consulta
db.collection.watch().Para ejemplos completos con la salida del flujo de cambios, consulta Change Streams con imágenes anteriores y posteriores de documentos.
Crear colección
Crear una colección temperatureSensor que tenga changeStreamPreAndPostImages habilitada:
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
Rellenar la colección temperatureSensor con lecturas de temperatura:
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
Las siguientes secciones muestran ejemplos de flujos de cambios para imágenes previas y posteriores de documentos que utilizan la colección temperatureSensor.
Flujo de cambios con imagen anterior del documento
Utilizas la configuración fullDocumentBeforeChange: "whenAvailable" para generar la imagen anterior del documento, si está disponible. La imagen anterior es el documento antes de ser reemplazado, actualizado o borrado. No existe una imagen anterior para un documento insertado.
El siguiente ejemplo crea un cursor de flujo de cambios para la colección temperatureSensor utilizando fullDocumentBeforeChange:
"whenAvailable":
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
El siguiente ejemplo utiliza el cursor para verificar nuevos eventos de flujo de cambios:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
En el ejemplo:
El bucle
whilese ejecuta hasta que el cursor se cierra.hasNext()devuelvetruesi el cursor contiene documentos.
El siguiente ejemplo actualiza el campo reading de un documento temperatureSensor:
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
Después de que se actualice el documento temperatureSensor, el evento de cambio genera la imagen anterior del documento en el campo fullDocumentBeforeChange. La imagen anterior contiene el campo temperatureSensor del documento reading antes de que se actualizara. Por ejemplo:
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
Tip
Para obtener detalles de salida de la actualización de documentos, consulte Eventos de actualización del flujo de cambios.
Para obtener detalles sobre la salida del flujo de cambios, consulta Eventos de Cambio.
Flujo de cambios con imagen después de la publicación del documento
Utiliza la configuración fullDocument: "whenAvailable" para generar la imagen posterior del documento, si está disponible. La imagen posterior es el documento después de ser insertado, reemplazado o actualizado. No hay imagen de publicación para un documento borrado.
El siguiente ejemplo crea un cursor de flujo de cambios para la colección temperatureSensor utilizando fullDocument:
"whenAvailable":
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
El siguiente ejemplo utiliza el cursor para verificar nuevos eventos de flujo de cambios:
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
En el ejemplo:
El bucle
whilese ejecuta hasta que el cursor se cierra.hasNext()devuelvetruesi el cursor contiene documentos.
El siguiente ejemplo actualiza el campo reading de un documento temperatureSensor:
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
Después de que se actualice el documento temperatureSensor, el evento de cambio genera la imagen de publicación del documento en el campo fullDocument. La imagen posterior contiene el campo temperatureSensor del documento reading después de que fue actualizado. Por ejemplo:
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
Para obtener detalles de salida de la actualización de documentos, consulte Eventos de actualización del flujo de cambios.
Para obtener detalles sobre la salida del flujo de cambios, consulta Eventos de Cambio.
Flujo de cambios con filtro de canalización de agregación
Nota
A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.
La siguiente operación abre un cursor de flujo de cambios en la colección data.sensors utilizando una canalización de agregación para filtrar solo los eventos insert:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.hasNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
El cursor de flujo de cambios solo devuelve eventos de cambio donde el operationType es insert. Para obtener la documentación completa sobre la salida del flujo de cambios, consulta Eventos de cambio.
Reanudar un flujo de cambios
Cada documento devuelto por un cursor de flujo de cambios incluye un token de reanudación como el campo _id. Para reanudar un flujo de cambios, pase el documento _id completo del evento de cambio del que desea reanudar a la opción resumeAfter o startAfter de watch().
La siguiente operación reanuda un cursor de flujo de cambios en la colección data.sensors utilizando un token de reanudación. Esto asume que la operación que generó el token de reanudación no se ha eliminado del oplog del clúster.
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.hasNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
Consulta Reanudar un flujo de cambios para obtener documentación completa sobre cómo reanudar un flujo de cambios.