Docs Menu
Docs Home
/ /

Supervisar los cambios en los datos

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.

Al utilizar el controlador Scala, puede llamar al watch() Método para devolver una instancia de ChangeStreamObservable. Luego, puede suscribirse a la instancia ChangeStreamObservable para ver nuevos cambios en los datos, como actualizaciones, inserciones y eliminaciones.

Los ejemplos de esta guía utilizan la colección restaurants en la base de datos sample_restaurants de la Conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde su aplicación Scala, cree un MongoClient que se conecte a un clúster de Atlas y asigne los siguientes valores a las variables database collection y:

val database: MongoDatabase = client.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

Tip

Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte la guía de introducción a MongoDB.

Algunos ejemplos utilizan instancias de la clase LatchedObserver para gestionar eventos del flujo de cambios. Esta clase es un observador personalizado que imprime eventos de flujo de cambios y continúa supervisando los cambios de datos hasta que el stream termina o genera un error. Para usar la clase LatchedObserver, pega el siguiente código en tu archivo de aplicación:

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}

Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que supervisa el flujo de cambios. Puede llamar al método watch() en instancias de las siguientes clases:

  • MongoClient:Supervisa los cambios en todas las colecciones en todas las bases de datos de una implementación, excluyendo las colecciones del sistema o las colecciones en las admin local config bases de datos, y

  • MongoDatabase:Monitorea los cambios en todas las colecciones en una base de datos

  • MongoCollection:Monitorea los cambios en una colección

El siguiente ejemplo llama al método watch() para abrir un flujo de cambios en la colección restaurants. El código crea una instancia LatchedObserver para recibir y generar los cambios a medida que ocurren:

val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await()

Para empezar a detectar cambios, ejecute el código anterior. Luego, en un shell independiente, ejecute el siguiente código para actualizar un documento cuyo valor de campo name sea "Blarney Castle":

val filter = equal("name", "Blarney Castle")
val update = set("cuisine", "American")
collection.updateOne(filter, update)
.subscribe((res: UpdateResult) => println(res),
(e: Throwable) => println(s"There was an error: $e"))

Cuando ejecuta el código anterior para actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que ocurre. El evento de cambio impreso se asemeja a la siguiente salida:

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null,
fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}},
clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[],
updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null},
txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}

Para modificar la salida del flujo de cambios, puede pasar una lista de etapas de la canalización como parámetro al método watch(). Puede incluir las siguientes etapas en la lista:

  • $addFields o $set: Agrega nuevos campos a los documentos

  • $match:Filtra los documentos

  • $project: Proyecta un subconjunto de los campos del documento

  • $replaceWith o $replaceRoot: reemplaza el documento de entrada con el documento especificado

  • $redact:Restringe el contenido de los documentos

  • $unset:Elimina campos de los documentos

El controlador Scala proporciona la clase Aggregates, que incluye métodos auxiliares para construir las etapas de la canalización anteriores.

Tip

Para obtener más información sobre las etapas de la canalización y sus métodos auxiliares Aggregates correspondientes, consulte los siguientes recursos:

El siguiente ejemplo crea un pipeline que utiliza el método Aggregates.filter() para compilar la etapa $match. Luego, el código pasa este pipeline al método watch() e instruye a watch() salir de eventos solo cuando ocurren operaciones de actualización:

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update"))))
observer.await()

Puede modificar el comportamiento del método watch() encadenando los métodos proporcionados por la clase ChangeStreamObservable. La siguiente tabla describe algunos de estos métodos:

Método
Descripción

fullDocument()

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see the Include Pre-Images and Post-Images section of this guide.

fullDocumentBeforeChange()

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

comment()

Attaches a comment to the operation.

startAtOperationTime()

Instructs the change stream to provide only changes that occurred at or after the specified timestamp.

collation()

Sets the collation to use for the change stream cursor.

Para una lista completa de las opciones de watch(), consulte ChangeStreamObservable en la documentación de la API.

Importante

Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB Server v6.0 o posterior.

De forma predeterminada, cuando se realiza una operación en una colección, el evento de cambio correspondiente incluye solo los campos modificados y sus valores antes y después de la operación.

Puede indicar al método que devuelva watch() la preimagen del documento, la versión completa del documento antes de los cambios, además de los campos modificados. Para incluir la preimagen en el evento de flujo de cambios, encadene el fullDocumentBeforeChange() método watch() a. Pase uno de los siguientes valores al fullDocumentBeforeChange() método:

  • FullDocumentBeforeChange.WHEN_AVAILABLEEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, este campo de evento de cambio tiene un valor null.

  • FullDocumentBeforeChange.REQUIRED: El evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el servidor genera un error.

También puede watch() indicar al método que devuelva la imagen posterior del documento, la versión completa del documento después de los cambios, además de los campos modificados. Para incluir la imagen posterior en el evento de flujo de cambios, encadene el fullDocument() método watch() a. Pase uno de los siguientes valores al fullDocument() método:

  • FullDocument.UPDATE_LOOKUP:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.

  • FullDocument.WHEN_AVAILABLEEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, este campo de evento de cambio tiene un valor null.

  • FullDocument.REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el servidor genera un error.

El siguiente ejemplo llama al método watch() en una colección e incluye la imagen posterior de los documentos actualizados encadenando el método fullDocument():

val observer = LatchedObserver()
collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.subscribe(observer)
observer.await()

Con la aplicación de flujo de cambios ejecutándose en un shell separado, actualizar un documento en la restaurants colección mediante el ejemplo de actualización anterior imprime un evento de cambio que se parece a la siguiente salida:

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null,
fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24",
"coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard",
"zipcode": "11697"}), (borough,BsonString{value='Queens'}),
(cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}),
(name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}),
(blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null,
documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=
UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{...}}

Tip

Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Contabilizar documentos

En esta página