Definición
Mongo.watch( pipeline, options )Solo para Sets de réplicas y clústeres fragmentados
Abre un(a) cambiar el cursor del flujo de cambios para un set de réplicas o un clúster para informar sobre todas sus colecciones no-
systemen sus bases de datos, con la excepción de las bases de datosadmin,localyconfig.ParameterTipoDescripciónpipelinearreglo
Opcional. Una canalización de agregación que consiste en una o más de las siguientes etapas de agregación:
Especifica un pipeline para filtrar o modificar la salida de los eventos de cambio.
A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.
optionsDocumento
Opcional. Opciones adicionales que modifican el comportamiento de
Mongo.watch().El documento
optionspuede contener los siguientes campos y valores:CampoTipoDescripciónresumeAfterDocumento
Opcional. Indica a
Mongo.watch()que intente reanudar las notificaciones comenzando después de la operación especificada en el token de reanudación.Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo
_id. Pasa todo el campo_iddel documento de evento de cambio que representa la operación que deseas reanudar después.resumeAfteres mutuamente excluyente constartAfterystartAtOperationTime.startAfterDocumento
Opcional. Indica a
Mongo.watch()que intente iniciar un nuevo flujo de cambios después de la operación especificada en el token de reanudación. Permite que las notificaciones se reanuden después de un evento de invalidación.Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo
_id. Pasa todo el campo_iddel documento de evento de cambio que representa la operación que deseas reanudar después.startAfteres mutuamente excluyente conresumeAfterystartAtOperationTime.fullDocumentstring
Opcional. Por defecto,
Mongo.watch()devuelve el delta de los campos que han sido modificados por una operación de actualización, en lugar de todo el documento actualizado.Configura
fullDocumenten"updateLookup"para dirigir aMongo.watch()a buscar la versión más reciente del documento actualizado que haya sido comprometida por la mayoría.Mongo.watch()devuelve un campofullDocumentcon la búsqueda de documentos además del delta deupdateDescription.batchSizeInt
opcional. Especifica el número máximo de eventos de cambio a devolver en cada agrupar de la respuesta del clúster de MongoDB.
Tiene la misma funcionalidad que
cursor.batchSize().maxAwaitTimeMSInt
Opcional. El tiempo máximo en milisegundos que el servidor espera para que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver una agrupación vacía.
Se establece por defecto en
1000milisegundos.collationDocumento
opcional. Pase un documento de intercalación para especificar la intercalación para el cursor del flujo de cambios.
Si se omite, por defecto se realizará la comparación binaria
simple.startAtOperationTimeMarca de tiempo
Opcional. El punto de inicio para el flujo de cambios. Si el punto de inicio especificado está en el pasado, debe encontrarse dentro del rango de tiempo del oplog. Para comprobar el rango temporal del oplog, consulta
rs.printReplicationInfo().startAtOperationTimees mutuamente excluyente conresumeAfterystartAfter.Devuelve: Un cursor sobre los documentos de eventos de cambio. Consulta Eventos de cambio para ver ejemplos de documentos de eventos de cambio.
Compatibilidad
Este método está disponible en implementaciones alojadas en los siguientes entornos:
MongoDB Atlas: El servicio totalmente gestionado para implementaciones de MongoDB en la nube
MongoDB Enterprise: La versión basada en suscripción y autogestionada de MongoDB
MongoDB Community: La versión de MongoDB con código fuente disponible, de uso gratuito y autogestionada.
Disponibilidad
Implementación
Mongo.watch() está disponible para sets de réplicas y clústeres:
Para un set de réplicas, se puede emitir
Mongo.watch()en cualquier Nodo que contenga datos.Para un clúster particionado, se debe emitir
Mongo.watch()en una instanciamongos.
Motor de almacenamiento
Solo se puede usar Mongo.watch() con el motor de almacenamiento WiredTiger.
Nivel de consistencia de lectura majority Support
Los flujos de cambios están disponibles independientemente del soporte de "majority" de nivel de consistencia de lectura; es decir, el soporte de nivel de consistencia de lectura majority puede estar habilitado (por defecto) o deshabilitado para usar flujos de cambios.
Comportamiento
Mongo.watch()solo notifica a la mayoría de los miembros portadores de datos sobre cambios en los datos que han persistido.El cursor de flujo de cambios permanece abierto hasta que ocurra una de las siguientes situaciones:
El cursor se cierra explícitamente.
Se produce un evento de invalidación; por ejemplo, un descarte o cambio de nombre de una colección.
La conexión a la implementación de MongoDB se cierra o se agota el tiempo de espera. Consulta Comportamiento para obtener más información.
Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.
Reanudabilidad
A diferencia de los controladores de MongoDB, mongosh no intenta automáticamente reanudar un cursor de flujo de cambios después de un error. Los controladores de MongoDB hacen un intento de reanudar automáticamente un cursor de flujo de cambios después de ciertos errores.
Mongo.watch() utiliza la información almacenada en el oplog para generar la descripción del evento de cambio y crear un token de reanudación asociado a esa operación. Si la operación identificada por el token de reanudación pasado a la opción resumeAfter o startAfter ya ha salido del oplog, Mongo.watch() no puede reanudar el flujo de cambios.
Consulte Reanudar un flujo de cambios para obtener más información sobre cómo reanudar un flujo de cambios.
Nota
No se puede usar
resumeAfterpara reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.
Nota
Token de reanudación
Tokens codificados en hexadecimal
Con los tokens de reanudación codificados en hex, puedes comparar y ordenar los tokens de reanudación. Puedes utilizar tanto los tokens de reanudación BinData como los tokens de reanudación en string hexadecimal para reanudar un flujo de cambios.
Los nuevos formatos de token de reanudación introducidos en una versión de MongoDB no pueden ser utilizados por versiones anteriores de MongoDB.
Decodificar tokens de reanudación
MongoDB proporciona un "fragmento", una extensión de mongosh, que descodifica los tokens de reanudación codificados en hexadecimal.
Puedes instalar y ejecutar el token de reanudación snippet de mongosh:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
También puede ejecutar resumetoken desde la línea de comandos (sin utilizar mongosh) si npm está instalado en su sistema:
npx mongodb-resumetoken-decoder <RESUME TOKEN>
Consulte lo siguiente para más detalles sobre:
Consulta completa de documentos de operaciones de actualización
Por defecto, el cursor de flujo de cambios devuelve cambios/deltas específicos de campo para las operaciones de actualización. También puedes configurar el flujo de cambios para buscar y devolver la versión actual del documento modificado que ha sido comprometida por la mayoría. Dependiendo de otras operaciones de guardado que puedan haber ocurrido entre la actualización y la búsqueda, el documento devuelto puede diferir significativamente del documento en el momento de la actualización.
Dependiendo del número de cambios aplicados durante la operación de actualización y del tamaño del documento completo, existe el riesgo de que el tamaño del documento de evento de cambio para una operación de actualización sea mayor que el límite de 16 MB para documentos BSON. Si esto ocurre, el servidor cierra el cursor del flujo de cambios y devuelve un error.
Disponibilidad
Los flujos de cambios están disponibles independientemente del soporte de "majority" de nivel de consistencia de lectura; es decir, el soporte de nivel de consistencia de lectura majority puede estar habilitado (por defecto) o deshabilitado para usar flujos de cambios.
Control de acceso
Al ejecutar con control de acceso, el usuario debe tener las acciones de privilegio find y changeStream en todas las colecciones que no sean del sistema en todas las bases de datos. Es decir, un usuario debe tener un rol que otorgue el siguiente privilegio:
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
El rol read incorporado proporciona los privilegios apropiados.
Iteración de cursor
MongoDB ofrece múltiples maneras de iterar sobre un cursor.
El cursor.hasNext() método se bloquea y espera el siguiente evento. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza hasNext() de esta manera:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
El cursor.tryNext() método es no bloqueante. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza tryNext() de esta manera:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Ejemplo
La siguiente operación en mongosh abre un cursor de flujo de cambios en un set de réplicas. El cursor devuelto informa sobre los cambios en los datos en todas las colecciones que no sonsystem, en todas las bases de datos excepto en las bases de datos admin, local y config.
watchCursor = db.getMongo().watch()
Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.
Nota
No puedes usar isExhausted() con flujos de cambios.