Docs Menu
Docs Home
/ /

Supervisar los cambios en los datos

En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.

Cuando se utiliza el controlador Scala, se puede llamar a la watch() método para devolver una instancia de ChangeStreamObservable. Luego, puedes suscribirte a la instancia de ChangeStreamObservable para ver los cambios en los datos nuevos, como actualizaciones, inserciones y eliminaciones.

Los ejemplos de esta guía usan la colección restaurants en la base de datos sample_restaurants de la Conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde tu aplicación Scala, crea un MongoClient que se conecte a un clúster de Atlas y asigna los siguientes valores a tus variables database y collection:

val database: MongoDatabase = client.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

Tip

Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de ejemplo, consulta la guía MongoDB Get Started.

Algunos ejemplos utilizan instancias de la clase LatchedObserver para gestionar eventos del flujo de cambios. Esta clase es un observador personalizado que imprime eventos de flujo de cambios y continúa supervisando los cambios de datos hasta que el stream termina o genera un error. Para usar la clase LatchedObserver, pega el siguiente código en tu archivo de aplicación:

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}

Para abrir un flujo de cambios, llama al método watch(). La instancia en la que se llama al método watch() determina el alcance de los eventos que el flujo de cambios supervisa. Puedes llamar al método watch() en instancias de las siguientes clases:

  • MongoClientMonitorea los cambios en todas las colecciones de todas las bases de datos de una implementación, excluyendo colecciones del sistema o colecciones en las bases de datos admin, local y config

  • MongoDatabase: Supervisa los cambios en todas las colecciones de una base de datos

  • MongoCollection: Supervisa los cambios en una colección

El siguiente ejemplo llama al método watch() para abrir un flujo de cambios en la colección restaurants. El código crea una instancia de LatchedObserver para recibir y realizar cambios a medida que ocurra:

val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await()

Para comenzar a observar cambios, ejecuta el código anterior. Luego, en una shell separada, ejecuta el siguiente código para actualizar un documento que tenga un valor de campo name igual a "Blarney Castle":

val filter = equal("name", "Blarney Castle")
val update = set("cuisine", "American")
collection.updateOne(filter, update)
.subscribe((res: UpdateResult) => println(res),
(e: Throwable) => println(s"There was an error: $e"))

Cuando ejecuta el código anterior para actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que ocurre. El evento de cambio impreso se asemeja a la siguiente salida:

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null,
fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}},
clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[],
updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null},
txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}

Para modificar la salida del flujo de cambios, se puede pasar una lista de etapas de la pipeline como parámetro al método watch(). Puede incluir las siguientes etapas en la lista:

  • $addFields o $set: Agrega nuevos campos a los documentos

  • $matchFiltra los documentos

  • $project: Proyecta un subconjunto de los campos del documento

  • $replaceWith o $replaceRoot: Reemplaza el documento de entrada por el documento especificado

  • $redact: Restringe los contenidos de los documentos

  • $unsetRemueve campos de documentos

El driver de Scala proporciona la clase Aggregates, que incluye métodos asistentes para construir las etapas anteriores del pipeline.

Tip

Para aprender más sobre las etapas de pipeline y sus métodos asistentes Aggregates correspondientes, consulta los siguientes recursos:

El siguiente ejemplo crea un pipeline que utiliza el método Aggregates.filter() para compilar la etapa $match. Luego, el código pasa este pipeline al método watch() e instruye a watch() salir de eventos solo cuando ocurren operaciones de actualización:

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update"))))
observer.await()

Puede modificar el comportamiento del método watch() encadenando los métodos proporcionados por la clase ChangeStreamObservable. La siguiente tabla describe algunos de estos métodos:

Método
Descripción

fullDocument()

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see the Include Pre-Images and Post-Images section of this guide.

fullDocumentBeforeChange()

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

comment()

Attaches a comment to the operation.

startAtOperationTime()

Instructs the change stream to provide only changes that occurred at or after the specified timestamp.

collation()

Sets the collation to use for the change stream cursor.

Para una lista completa de las opciones de watch(), consulte ChangeStreamObservable en la documentación de la API.

Importante

Solo puede habilitar preimagenes y postimagenes en colecciones si su implementación usa MongoDB Server v6.0 o posterior.

Por defecto, al ejecutar una operación en una colección, el evento de cambio correspondiente solo incluye los campos modificados y sus valores antes y después de la operación.

Puedes indicar a watch() que devuelva la pre-imagen del documento, es decir, la versión completa del documento antes de los cambios, además de los campos modificados. Para incluir la preimagen en el evento de flujo de cambios, encadena el método fullDocumentBeforeChange() a watch(). Pasa uno de los siguientes valores al método fullDocumentBeforeChange():

  • FullDocumentBeforeChange.WHEN_AVAILABLEEl evento de cambio incluye una preimagen del documento modificado para los eventos de cambio. Si la imagen previa no está disponible, este campo de evento de cambio tiene un valor de null.

  • FullDocumentBeforeChange.REQUIRED: El evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el servidor genera un error.

También puedes indicar al método watch() que devuelva la imagen posterior del documento, que es la versión completa del documento después de los cambios, además de los campos modificados. Para incluir la imagen posterior en el evento del flujo de cambios, concatenar el método fullDocument() al watch(). Pasa uno de los siguientes valores al método fullDocument():

  • FullDocument.UPDATE_LOOKUP: El evento de cambio incluye una copia de todo el documento cambiado a partir de cierto tiempo después del cambio.

  • FullDocument.WHEN_AVAILABLEEl evento de cambio incluye una imagen posterior del documentos modificado para eventos de cambio. Si la imagen posterior no está disponible, este campo de evento de cambio tiene un valor de null.

  • FullDocument.REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado para las eventos de cambio. Si la imagen posterior no está disponible, el servidor genera un error.

El siguiente ejemplo llama al método watch() en una colección e incluye la posterior imagen de documentos actualizados encadenando el método fullDocument():

val observer = LatchedObserver()
collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.subscribe(observer)
observer.await()

Con la aplicación de flujo de cambios ejecutándose en un shell diferente, actualizar un documento en la colección restaurants usando el ejemplo de actualización anterior imprime un evento de cambio que se asemeja al siguiente resultado:

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null,
fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24",
"coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard",
"zipcode": "11697"}), (borough,BsonString{value='Queens'}),
(cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}),
(name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}),
(blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null,
documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=
UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{...}}

Tip

Para obtener más información sobre pre-imágenes y post-imágenes, consulta Change Streams con preimágenes y postimágenes de documentos en el manual de MongoDB Server.

Para saber más sobre las change streams, consulta Change Streams en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Contabilizar documentos

En esta página