Overview
En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.
Cuando se utiliza el controlador Scala, se puede llamar a la watch() método para devolver una instancia de ChangeStreamObservable. Luego, puedes suscribirte a la instancia de ChangeStreamObservable para ver los cambios en los datos nuevos, como actualizaciones, inserciones y eliminaciones.
Datos de muestra
Los ejemplos de esta guía usan la colección restaurants en la base de datos sample_restaurants de la Conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde su aplicación Scala, cree un MongoClient que se conecte a un clúster de Atlas y asigne los siguientes valores a las variables database collection y:
val database: MongoDatabase = client.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Tip
Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de ejemplo, consulta la guía MongoDB Get Started.
Algunos ejemplos utilizan instancias de la clase LatchedObserver para gestionar eventos del flujo de cambios. Esta clase es un observador personalizado que imprime eventos de flujo de cambios y continúa supervisando los cambios de datos hasta que el stream termina o genera un error. Para usar la clase LatchedObserver, pega el siguiente código en tu archivo de aplicación:
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() }
Abre un flujo de cambios
Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que supervisa el flujo de cambios. Puede llamar al método watch() en instancias de las siguientes clases:
MongoClient:Supervisa los cambios en todas las colecciones en todas las bases de datos de una implementación, excluyendo las colecciones del sistema o las colecciones en lasadminlocalconfigbases de datos, yMongoDatabase: Supervisa los cambios en todas las colecciones de una base de datosMongoCollection: Supervisa los cambios en una colección
El siguiente ejemplo llama al método watch() para abrir un flujo de cambios en la colección restaurants. El código crea una instancia de LatchedObserver para recibir y realizar cambios a medida que ocurra:
val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await()
Para comenzar a observar cambios, ejecuta el código anterior. Luego, en una shell separada, ejecuta el siguiente código para actualizar un documento que tenga un valor de campo name igual a "Blarney Castle":
val filter = equal("name", "Blarney Castle") val update = set("cuisine", "American") collection.updateOne(filter, update) .subscribe((res: UpdateResult) => println(res), (e: Throwable) => println(s"There was an error: $e"))
Cuando ejecuta el código anterior para actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que ocurre. El evento de cambio impreso se asemeja a la siguiente salida:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Modificar la salida del flujo de cambios
Para modificar la salida del flujo de cambios, se puede pasar una lista de etapas de la pipeline como parámetro al método watch(). Puede incluir las siguientes etapas en la lista:
$addFieldso$set: Agrega nuevos campos a los documentos$matchFiltra los documentos$project: Proyecta un subconjunto de los campos del documento$replaceWitho$replaceRoot: Reemplaza el documento de entrada por el documento especificado$redact:Restringe el contenido de los documentos$unsetRemueve campos de documentos
El controlador Scala proporciona la clase Aggregates, que incluye métodos auxiliares para construir las etapas de la canalización anteriores.
Tip
Para aprender más sobre las etapas de pipeline y sus métodos asistentes Aggregates correspondientes, consulta los siguientes recursos:
Aggregation Stages en el manual del servidor de MongoDB
Agregados en la documentación de la API
El siguiente ejemplo crea un pipeline que utiliza el método Aggregates.filter() para compilar la etapa $match. Luego, el código pasa este pipeline al método watch() e instruye a watch() salir de eventos solo cuando ocurren operaciones de actualización:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update")))) observer.await()
Modificar el comportamiento de watch()
Puede modificar el comportamiento del método watch() encadenando los métodos proporcionados por la clase ChangeStreamObservable. La siguiente tabla describe algunos de estos métodos:
Método | Descripción |
|---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Attaches a comment to the operation. |
| Instructs the change stream to provide only changes that occurred at or after
the specified timestamp. |
| Sets the collation to use for the change stream cursor. |
Para una lista completa de las opciones de watch(), consulte ChangeStreamObservable en la documentación de la API.
Incluir imágenes preoperativas y postoperatorias
Importante
Solo puede habilitar preimagenes y postimagenes en colecciones si su implementación usa MongoDB Server v6.0 o posterior.
De forma predeterminada, cuando se realiza una operación en una colección, el evento de cambio correspondiente incluye solo los campos modificados y sus valores antes y después de la operación.
Puede indicar al método que devuelva watch() la preimagen del documento, la versión completa del documento antes de los cambios, además de los campos modificados. Para incluir la preimagen en el evento de flujo de cambios, encadene el fullDocumentBeforeChange() método watch() a. Pase uno de los siguientes valores al fullDocumentBeforeChange() método:
FullDocumentBeforeChange.WHEN_AVAILABLEEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, este campo de evento de cambio tiene un valornull.FullDocumentBeforeChange.REQUIRED: El evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el servidor genera un error.
También puedes indicar al método watch() que devuelva la imagen posterior del documento, que es la versión completa del documento después de los cambios, además de los campos modificados. Para incluir la imagen posterior en el evento del flujo de cambios, concatenar el método fullDocument() al watch(). Pasa uno de los siguientes valores al método fullDocument():
FullDocument.UPDATE_LOOKUP: El evento de cambio incluye una copia de todo el documento cambiado a partir de cierto tiempo después del cambio.FullDocument.WHEN_AVAILABLEEl evento de cambio incluye una imagen posterior del documentos modificado para eventos de cambio. Si la imagen posterior no está disponible, este campo de evento de cambio tiene un valor denull.FullDocument.REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el servidor genera un error.
El siguiente ejemplo llama al método watch() en una colección e incluye la posterior imagen de documentos actualizados encadenando el método fullDocument():
val observer = LatchedObserver() collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .subscribe(observer) observer.await()
Con la aplicación de flujo de cambios ejecutándose en un shell separado, actualizar un documento en la restaurants colección mediante el ejemplo de actualización anterior imprime un evento de cambio que se parece a la siguiente salida:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24", "coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard", "zipcode": "11697"}), (borough,BsonString{value='Queens'}), (cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}), (name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}), (blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription= UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Tip
Para obtener más información sobre pre-imágenes y post-imágenes, consulta Change Streams con preimágenes y postimágenes de documentos en el manual de MongoDB Server.
Información Adicional
Para saber más sobre las change streams, consulta Change Streams en el manual de MongoDB Server.
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: