Docs Menu
Docs Home
/ /

Mongo.watch() (método mongosh)

Mongo.watch( pipeline, options )

Solo para Sets de réplicas y clústeres fragmentados

Abre una cambie el cursor de flujo de un conjunto de réplicas o un clúster fragmentado para informar sobre todas sussystem colecciones que no sean en sus bases de datos, con la excepción admin delocal las config bases de datos, y.

Parameter
Tipo
Descripción

pipeline

arreglo

Opcional. Una canalización de agregación que consiste en una o más de las siguientes etapas de agregación:

Especifica un pipeline para filtrar o modificar la salida de los eventos de cambio.

A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación de flujos de cambios modifica el campo _id de un evento.

options

Documento

Opcional. Opciones adicionales que modifican el comportamiento de Mongo.watch().

El documento options puede contener los siguientes campos y valores:

Campo
Tipo
Descripción

resumeAfter

Documento

Opcional. Indica a Mongo.watch() que intente reanudar las notificaciones comenzando después de la operación especificada en el token de reanudación.

Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo _id. Pasa todo el campo _id del documento de evento de cambio que representa la operación que deseas reanudar después.

resumeAfter es mutuamente excluyente con startAfter y startAtOperationTime.

startAfter

Documento

Opcional. Indica a Mongo.watch() que intente iniciar un nuevo flujo de cambios después de la operación especificada en el token de reanudación. Permite que las notificaciones se reanuden después de un evento de invalidación.

Cada documento de evento de flujo de cambios incluye un token de reanudación como el campo _id. Pasa todo el campo _id del documento de evento de cambio que representa la operación que deseas reanudar después.

startAfter es mutuamente excluyente con resumeAfter y startAtOperationTime.

fullDocument

string

Opcional. Por defecto, Mongo.watch() devuelve el delta de los campos que han sido modificados por una operación de actualización, en lugar de todo el documento actualizado.

Configura fullDocument en "updateLookup" para dirigir a Mongo.watch() a buscar la versión más reciente del documento actualizado que haya sido comprometida por la mayoría. Mongo.watch() devuelve un campo fullDocument con la búsqueda de documentos además del delta de updateDescription.

batchSize

Int

Opcional. Especifica el número máximo de eventos de cambio que se devolverán en cada lote de la respuesta del clúster MongoDB.

Tiene la misma funcionalidad que cursor.batchSize().

maxAwaitTimeMS

Int

Opcional. El tiempo máximo en milisegundos que el servidor espera para que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver una agrupación vacía.

Se establece por defecto en 1000 milisegundos.

collation

Documento

Opcional. Pase un documento de intercalación para especificar la intercalación del cursor del flujo de cambios.

Si se omite, el valor predeterminado es la comparación binaria simple.

startAtOperationTime

Marca de tiempo

Opcional. El punto de inicio para el flujo de cambios. Si el punto de inicio especificado está en el pasado, debe encontrarse dentro del rango de tiempo del oplog. Para comprobar el rango temporal del oplog, consulta rs.printReplicationInfo().

startAtOperationTime es mutuamente excluyente con resumeAfter y startAfter.

Devuelve:Un cursor sobre los documentos de eventos de cambio. Consulte Eventos de cambio para ver ejemplos de documentos de eventos de cambio.

Tip

Este método está disponible en implementaciones alojadas en los siguientes entornos:

  • MongoDB Atlas: El servicio totalmente gestionado para implementaciones de MongoDB en la nube

  • MongoDB Enterprise: La versión basada en suscripción y autogestionada de MongoDB

  • MongoDB Community: La versión de MongoDB con código fuente disponible, de uso gratuito y autogestionada.

Mongo.watch() Está disponible para conjuntos de réplicas y clústeres fragmentados:

  • Para un set de réplicas, se puede emitir Mongo.watch() en cualquier Nodo que contenga datos.

  • Para un clúster particionado, se debe emitir Mongo.watch() en una instancia mongos.

Solo se puede usar Mongo.watch() con el motor de almacenamiento WiredTiger.

Los flujos de cambios están disponibles independientemente del soporte de "majority" de nivel de consistencia de lectura; es decir, el soporte de nivel de consistencia de lectura majority puede estar habilitado (por defecto) o deshabilitado para usar flujos de cambios.

  • Mongo.watch() solo notifica a la mayoría de los miembros portadores de datos sobre cambios en los datos que han persistido.

  • El cursor de flujo de cambios permanece abierto hasta que ocurra una de las siguientes situaciones:

    • El cursor se cierra explícitamente.

    • Se produce un evento de invalidación; por ejemplo, un descarte o cambio de nombre de una colección.

    • La conexión a la implementación de MongoDB se cierra o se agota el tiempo de espera. Consulta Comportamiento para obtener más información.

    • Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.

A diferencia de los controladores de MongoDB, mongosh no intenta automáticamente reanudar un cursor de flujo de cambios después de un error. Los controladores de MongoDB hacen un intento de reanudar automáticamente un cursor de flujo de cambios después de ciertos errores.

Mongo.watch() utiliza la información almacenada en el oplog para generar la descripción del evento de cambio y crear un token de reanudación asociado a esa operación. Si la operación identificada por el token de reanudación pasado a la opción resumeAfter o startAfter ya ha salido del oplog, Mongo.watch() no puede reanudar el flujo de cambios.

Consulte Reanudar un flujo de cambios para obtener más información sobre cómo reanudar un flujo de cambios.

Nota

  • No se puede usar resumeAfter para reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o cambio de nombre de colección) cierre el flujo. En su lugar, se puede utilizar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.

  • Si la implementación es un clúster particionado, la eliminación de una partición puede provocar que un cursor del flujo de cambios abierto se cierre. Es posible que el cursor cerrado del flujo de cambios no sea completamente reanudable.

Nota

Token de reanudación

Con los tokens de reanudación de cadena con codificación hexadecimal, puede compararlos y ordenarlos. Puede usar tokens de reanudación de BinData o de cadena hexadecimal para reanudar un flujo de cambios.

Los nuevos formatos de token de currículum introducidos en una versión de MongoDB no pueden ser utilizados por versiones anteriores de MongoDB.

MongoDB proporciona un "fragmento", una extensión de mongosh, que descodifica los tokens de reanudación codificados en hexadecimal.

Puede instalar y ejecutar el resumetokenfragmento mongosh de:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

También puede ejecutar resumetoken desde la línea de comandos (sin utilizar mongosh) si npm está instalado en su sistema:

npx mongodb-resumetoken-decoder <RESUME TOKEN>

Consulte lo siguiente para más detalles sobre:

Por defecto, el cursor de flujo de cambios devuelve cambios/deltas específicos de campo para las operaciones de actualización. También puedes configurar el flujo de cambios para buscar y devolver la versión actual del documento modificado que ha sido comprometida por la mayoría. Dependiendo de otras operaciones de guardado que puedan haber ocurrido entre la actualización y la búsqueda, el documento devuelto puede diferir significativamente del documento en el momento de la actualización.

Dependiendo del número de cambios aplicados durante la operación de actualización y del tamaño del documento completo, existe el riesgo de que el tamaño del documento de evento de cambio para una operación de actualización sea mayor que el límite de 16 MB para documentos BSON. Si esto ocurre, el servidor cierra el cursor del flujo de cambios y devuelve un error.

Los flujos de cambios están disponibles independientemente del soporte de "majority" de nivel de consistencia de lectura; es decir, el soporte de nivel de consistencia de lectura majority puede estar habilitado (por defecto) o deshabilitado para usar flujos de cambios.

Al ejecutar con control de acceso, el usuario debe tener las acciones de privilegio find y changeStream en todas las colecciones que no sean del sistema en todas las bases de datos. Es decir, un usuario debe tener un rol que otorgue el siguiente privilegio:

{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

El rol read incorporado proporciona los privilegios apropiados.

MongoDB ofrece múltiples maneras de iterar sobre un cursor.

El cursor.hasNext() método se bloquea y espera el siguiente evento. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza hasNext() de esta manera:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

El cursor.tryNext() método es no bloqueante. Para supervisar el cursor watchCursor e iterar sobre los eventos, utiliza tryNext() de esta manera:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

La siguiente operación en abre un cursor de flujo de cambios en un conjunto de réplicas. El cursor devuelto informa sobre los cambios de datos en todas las colecciones que mongosh nosystem pertenecen a en todas las bases de datos, excepto en admin locallas config bases de datos, y.

watchCursor = db.getMongo().watch()

Itera el cursor para verificar si hay nuevos eventos. Utiliza el método cursor.isClosed() junto con el método cursor.tryNext() para asegurarte de que el bucle solo salga si el cursor del flujo de cambios está cerrado y no quedan objetos en el último grupo:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

Para obtener la documentación completa sobre la salida del flujo de cambios, consultar Eventos de cambios.

Nota

No puedes usar isExhausted() con flujos de cambios.

Volver

Mongo.setWriteConcern

En esta página