Overview
En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar cambios en tiempo real en tu base de datos. Un flujo de cambios es una funcionalidad del servidor de MongoDB que permite a tu aplicación suscribirse a cambios de datos en una única colección, base de datos o implementación.
Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a una implementación de MongoDB v6.0 o posterior, también puede configurar los eventos para que incluyan los datos del documento antes y después del cambio.
Aprenda a abrir y configurar sus flujos de cambio en las siguientes secciones:
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puede usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Atlas Stream Processing en la documentación de MongoDB Atlas.
Abre un flujo de cambios
Puede abrir un flujo de cambios para suscribirse a tipos específicos de cambios de datos y producir eventos de cambio en su aplicación.
Selecciona un alcance para observar
Para abrir un flujo de cambios, llame al watch() método en una instancia de MongoCollection, MongoDatabase o MongoClient.
Importante
Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página del manual del servidor MongoDB "Registro de operaciones del conjunto de réplicas".
El objeto en el que llama al método watch() determina el alcance de los eventos que escucha el flujo de cambios:
MongoCollection.watch()supervisa una colección.MongoDatabase.watch()supervisa todas las colecciones en una base de datos.MongoClient.watch()supervisa todos los cambios en la implementación de MongoDB conectada.
Filtrar los eventos
El watch() método toma una secuencia de agregación opcional como primer parámetro, que consiste en una lista de etapas que se pueden usar para filtrar y transformar la salida del evento de cambio, de la siguiente manera:
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
Nota
Para los eventos de cambio de la operación de actualización, los flujos de cambio solo devuelven los campos modificados por defecto, en lugar de todo el documento actualizado. Puede configurar su flujo de cambio para que también devuelva la versión más reciente del documento llamando al método miembro fullDocument() del objeto ChangeStreamIterable con el valor FullDocument.UPDATE_LOOKUP, como se indica a continuación:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Gestionar la salida
El método watch() devuelve una instancia de ChangeStreamIterable, una interfaz que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamIterable también hereda métodos de su interfaz principal, MongoIterable que implementa la interfaz principal de Java Iterable.
Puede llamar a forEach() en ChangeStreamIterable para manejar eventos a medida que ocurren, o puede usar el método iterator() que devuelve una instancia MongoChangeStreamCursor que puede usar para recorrer los resultados.
Puede llamar a los siguientes métodos en una instancia MongoChangeStreamCursor:
hasNext(): Comprueba si hay más resultadosnext(): Devuelve el siguiente documento de la coleccióntryNext(): Devuelve inmediatamente el siguiente elemento disponible en el flujo de cambio onull
Importante
Iterar sobre el cursor bloquea el hilo actual
Iterar un cursor usando forEach() o cualquier método iterator() bloquea el hilo actual mientras el flujo de cambios correspondiente escucha eventos. Si su programa necesita continuar ejecutando otra lógica, como procesar solicitudes o responder a la entrada del usuario, considere crear y escuchar su flujo de cambios en un hilo separado.
A diferencia del MongoCursor devuelto por otras consultas, un MongoChangeStreamCursor asociado a un flujo de cambios espera hasta que llegue un evento de cambio antes de devolver un resultado de next(). Por lo tanto, las llamadas a next() que utilizan el MongoChangeStreamCursor de un flujo de cambios nunca generan un java.util.NoSuchElementException.
Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamIterable devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamIterable al final de este ejemplo para más detalles sobre los métodos disponibles.
Ejemplo
Este ejemplo muestra cómo abrir un flujo de cambios en la colección myColl e imprimir eventos del flujo de cambios a medida que ocurren.
El controlador almacena eventos de flujo de cambios en una variable de tipo ChangeStreamIterable. En el siguiente ejemplo, especificamos que el controlador debe rellenar el objeto ChangeStreamIterable con tipos Document. Como resultado, el controlador almacena eventos de flujo de cambios individuales como objetos ChangeStreamDocument.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
Una operación de inserción en la colección produce el siguiente resultado:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Ejemplo de visualización: Archivos completos
Nota
Configuración de ejemplo
Este ejemplo se conecta a una instancia de MongoDB mediante una URI de conexión. Para obtener más información sobre cómo conectarse a su instancia de MongoDB, consulte Guía para crear un MongoClient. Este ejemplo también utiliza la movies colección de la sample_mflix base de datos incluida en los conjuntos de datos de ejemplo de Atlas. Puede cargarlos en su base de datos en la versión gratuita de MongoDB Atlas siguiendo la guía de introducción de MongoDB.
Este ejemplo muestra cómo abrir un flujo de cambios mediante el método watch. El archivo Watch.java llama al método watch() con una canalización como argumento para filtrar solo los eventos "insert" y "update". El archivo WatchCompanion.java inserta, actualiza y elimina un documento.
Para utilizar los siguientes ejemplos, ejecute los archivos en este orden:
Ejecute el archivo
Watch.java.Ejecute el archivo
WatchCompanion.java.
Nota
El archivo Watch.java continuará ejecutándose hasta que se ejecute el archivo WatchCompanion.java.
Watch.java:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
Ejemplo de salida de archivo completo
Las aplicaciones anteriores generarán la siguiente salida:
Watch.java capturará solo las operaciones insert y update, ya que la canalización de agregación filtra la operación delete:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion imprimirá un resumen de las operaciones completadas:
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
Para obtener más información sobre el método watch(), consulte la siguiente documentación de API:
Aplicar operadores de agregación a su flujo de cambios
Puede pasar una canalización de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.
Para saber qué operadores de agregación admite su versión de MongoDB Server, consulte Modificar la salida del flujo de cambios.
Ejemplo
El siguiente ejemplo de código muestra cómo puede aplicar una canalización de agregación para configurar su flujo de cambios para recibir eventos de cambio solo para operaciones de inserción y actualización:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
Una operación de actualización en la colección produce el siguiente resultado:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Dividir eventos de flujo de cambios grandes
A partir de MongoDB 7.0, puede utilizar la etapa de agregación $changeStreamSplitLargeEvent para dividir eventos que superen los 16 MB en fragmentos más pequeños.
Use $changeStreamSplitLargeEvent solo cuando sea estrictamente necesario. Por ejemplo, use $changeStreamSplitLargeEvent si su aplicación requiere imágenes completas del documento, tanto previas como posteriores, y genera eventos que superan los 16 MB.
La etapa $changeStreamSplitLargeEvent devuelve los fragmentos secuencialmente. Puede acceder a ellos mediante un cursor de flujo de cambios. Cada fragmento incluye un objeto SplitEvent con los siguientes campos:
Campo | Descripción |
|---|---|
| El índice del fragmento, comenzando en |
| El número total de fragmentos que componen el evento dividido |
El siguiente ejemplo modifica su flujo de cambios mediante el uso de la etapa de agregación $changeStreamSplitLargeEvent para dividir eventos grandes:
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
Nota
Solo puede tener una etapa $changeStreamSplitLargeEvent en su canal de agregación, y debe ser la última etapa del canal.
Puede llamar al método getSplitEvent() en el cursor de su flujo de cambio para acceder a SplitEvent como se muestra en el siguiente ejemplo:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
Para obtener más información sobre la $changeStreamSplitLargeEvent etapa de agregación, consulte la documentación del servidor $changeStreamSplitLargeEvent.
Incluir preimágenes y postimágenes
Puede configurar el evento de cambio para que contenga u omita los siguientes datos:
La preimagen, un documento que representa la versión del documento antes de la operación, si existe
La postimagen, un documento que representa la versión del documento después de la operación, si existe
Importante
Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.
Para recibir eventos de flujo de cambios que incluyan una imagen previa o posterior, debe realizar las siguientes acciones:
Habilite imágenes previas y posteriores para la colección en su implementación de MongoDB.
Tip
Para saber cómo habilitar imágenes previas y posteriores en su implementación, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual del servidor.
Para aprender a indicar al driver que cree una colección con imágenes previas y posteriores habilitadas, consulte la sección Crear una colección con imágenes previas y posteriores habilitadas.
Configure su flujo de cambios para recuperar una o ambas imágenes previas o posteriores.
Tip
Para configurar su flujo de cambios para registrar la imagen previa en eventos de cambio, consulte el Ejemplo de configuración de imagen previa.
Para configurar su flujo de cambios para registrar la imagen posterior en eventos de cambio, consulte el Ejemplo de configuración de imagen posterior.
Crear una colección con imágenes previas y posteriores habilitadas
Para utilizar el controlador para crear una colección con las opciones de imagen previa y imagen posterior habilitadas, especifique una instancia de ChangeStreamPreAndPostImagesOptions y llame al método createCollection() como se muestra en el siguiente ejemplo:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
Puedes cambiar la opción de pre-imagen y post-imagen en una colección existente ejecutando el comando collMod desde la Shell de MongoDB. Para aprender cómo realizar esta operación, consulte la entrada sobre collMod en el manual del servidor.
Advertencia
Si habilitó imágenes previas o posteriores en una colección, modificar estas configuraciones con collMod puede provocar que los flujos de cambio existentes en esa colección fallen.
Ejemplo de configuración de preimagen
El siguiente ejemplo de código muestra cómo puede configurar un flujo de cambios en la colección myColl para incluir la imagen previa y generar cualquier evento de cambio:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
El ejemplo anterior configura el flujo de cambios para usar la opción FullDocumentBeforeChange.REQUIRED. Esta opción configura el flujo de cambios para requerir imágenes previas para los eventos de reemplazo, actualización y eliminación. Si la imagen previa no está disponible, el controlador genera un error.
Supongamos que actualiza el valor del campo amount de un documento de 150 a 2000. Este evento de cambio genera el siguiente resultado:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
Para obtener una lista de opciones, consulte la documentación de la API FullDocumentBeforeChange.
Ejemplo de configuración posterior a la imagen
El siguiente ejemplo de código muestra cómo puede configurar un flujo de cambios en la colección myColl para incluir la imagen previa y generar cualquier evento de cambio:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
El ejemplo anterior configura el flujo de cambios para usar la opción FullDocument.WHEN_AVAILABLE. Esta opción configura el flujo de cambios para devolver la imagen posterior del documento modificado para los eventos de reemplazo y actualización, si está disponible.
Supongamos que actualiza el valor del campo color de un documento de "purple" a "pink". El evento de cambio genera el siguiente resultado:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
Para obtener una lista de opciones, consulte la documentación de la API FullDocument.
Información Adicional
Documentación de la API
Para obtener más información sobre los métodos y clases utilizados para administrar flujos de cambios, consulte la siguiente documentación de API: