Overview
En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar cambios en tiempo real en tu base de datos. Un flujo de cambios es una funcionalidad del servidor de MongoDB que permite a tu aplicación suscribirse a cambios de datos en una única colección, base de datos o implementación.
Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a una implementación de MongoDB v6.0 o posterior, también puede configurar los eventos para que incluyan los datos del documento antes y después del cambio.
Aprenda a abrir y configurar sus flujos de cambios en las siguientes secciones:
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para obtener más información sobre esta funcionalidad, consulta Atlas Stream Processing en la documentación de MongoDB Atlas.
Abre un flujo de cambios
Puedes abrir un flujo de cambios para suscribirte a tipos específicos de cambios de datos y producir eventos de cambios en tu aplicación.
Selecciona un alcance para observar
Para abrir un flujo de cambios, llame al watch() método en una instancia de un MongoCollection, MongoDatabase o MongoClient.
Importante
Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página del manual del servidor MongoDB "Registro de operaciones del conjunto de réplicas".
El objeto en el que llama al método watch() determina el alcance de los eventos que escucha el flujo de cambios:
MongoCollection.watch()supervisa una colección.MongoDatabase.watch()supervisa todas las colecciones en una base de datos.MongoClient.watch()supervisa todos los cambios en la implementación de MongoDB conectada.
Filtrar los eventos
El watch() método toma una secuencia de agregación opcional como primer parámetro, que consiste en una lista de etapas que se pueden usar para filtrar y transformar la salida del evento de cambio, de la siguiente manera:
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
Nota
Para los eventos de cambio de la operación de actualización, los flujos de cambio solo devuelven los campos modificados por defecto, en lugar de todo el documento actualizado. Puede configurar su flujo de cambio para que también devuelva la versión más reciente del documento llamando al método miembro fullDocument() del objeto ChangeStreamIterable con el valor FullDocument.UPDATE_LOOKUP, como se indica a continuación:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
Gestionar la salida
El método watch() devuelve una instancia de ChangeStreamIterable, una interfaz que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamIterable también hereda métodos de su interfaz principal, MongoIterable que implementa la interfaz principal de Java Iterable.
Puede llamar a forEach() en ChangeStreamIterable para manejar eventos a medida que ocurren, o puede usar el método iterator() que devuelve una instancia MongoChangeStreamCursor que puede usar para recorrer los resultados.
Puedes llamar a los siguientes métodos en una instancia de MongoChangeStreamCursor:
hasNext(): Comprueba si hay más resultadosnext(): Devuelve el siguiente documento de la coleccióntryNext(): Devuelve inmediatamente el siguiente elemento disponible en el flujo de cambios onull
Importante
Iterar sobre el cursor bloquea el hilo actual
Iterar a través de un cursor usando forEach() o cualquier método iterator() bloquea el hilo actual mientras el flujo de cambios correspondiente escucha eventos. Si tu programa necesita seguir ejecutando otras lógicas, como procesar solicitudes o responder a la entrada del usuario, ten en cuenta la posibilidad de crear y escuchar tu flujo de cambios en un hilo separado.
A diferencia del MongoCursor devuelto por otras consultas, un MongoChangeStreamCursor asociado con un flujo de cambios espera hasta que llegue un evento de cambio antes de devolver un resultado de next(). Como resultado, las llamadas a next() que utilizan MongoChangeStreamCursor de un flujo de cambios nunca generan un java.util.NoSuchElementException.
Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamIterable devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamIterable al final de este ejemplo para más detalles sobre los métodos disponibles.
Ejemplo
Este ejemplo muestra cómo abrir un flujo de cambios en la colección myColl e imprimir los eventos del flujo de cambios a medida que ocurren.
El driver almacena los eventos del flujo de cambios en una variable de tipo ChangeStreamIterable. En el siguiente ejemplo, especificamos que el driver debe completar el objeto ChangeStreamIterable con tipos de Document. Como resultado, el controlador almacena eventos individuales de flujo de cambios como objetos ChangeStreamDocument.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
Una operación de inserción en la colección produce la siguiente salida:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Ejemplo de visualización: Archivos completos
Nota
Configuración de ejemplo
Este ejemplo se conecta a una instancia de MongoDB utilizando un URI de conexión. Para obtener más información sobre cómo conectarse a tu instancia de MongoDB, consulta el Crea un MongoClient guía. Este ejemplo también utiliza la colección movies en la base de datos sample_mflix incluida en los conjuntos de datos de muestra de Atlas. Puedes cargarlos en tu base de datos en el nivel gratuito de MongoDB Atlas siguiendo MongoDB Empezar.
Este ejemplo muestra cómo abrir un flujo de cambios mediante el método watch. El archivo Watch.java llama al método watch() con una canalización como argumento para filtrar solo los eventos "insert" y "update". El archivo WatchCompanion.java inserta, actualiza y elimina un documento.
Para utilizar los siguientes ejemplos, ejecute los archivos en este orden:
Ejecuta el argumento de entrada
Watch.javaArchivo.Ejecuta el argumento de entrada
WatchCompanion.javaArchivo.
Nota
El archivo Watch.java seguirá ejecutándose hasta que se ejecute el archivo WatchCompanion.java.
Watch.java:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
Resultado del ejemplo de archivo completo
Las aplicaciones anteriores generarán la siguiente salida:
Watch.java capturará solo las operaciones insert y update, ya que la canalización de agregación filtra la operación delete:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion imprimirá un resumen de las operaciones completadas:
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
Para obtener más información sobre el método watch(), consulte la siguiente documentación de API:
Aplicar operadores de agregación al flujo de cambios
Puedes pasar una pipeline de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.
Para aprender qué operadores de agregación admite la versión de tu MongoDB Server, consulta Modificar la salida de Change Stream.
Ejemplo
El siguiente ejemplo de código muestra cómo puedes aplicar una pipeline de agregación para configurar tu flujo de cambios y recibir eventos de cambio solo para operaciones de inserción y actualización:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
Una operación de actualización en la colección produce el siguiente resultado:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
Dividir eventos de flujo de cambios grandes
A partir de MongoDB 7.0, puede utilizar la etapa de agregación $changeStreamSplitLargeEvent para dividir los eventos que superen los 16 MB en fragmentos más pequeños.
Use $changeStreamSplitLargeEvent solo cuando sea estrictamente necesario. Por ejemplo, use $changeStreamSplitLargeEvent si su aplicación requiere imágenes completas del documento, tanto previas como posteriores, y genera eventos que superan los 16 MB.
La etapa $changeStreamSplitLargeEvent devuelve los fragmentos secuencialmente. Puedes acceder a los fragmentos utilizando un cursor de flujo de cambios. Cada fragmento incluye un objeto SplitEvent que contiene los siguientes campos:
Campo | Descripción |
|---|---|
| El índice del fragmento, comenzando en |
| El número total de fragmentos que componen el evento dividido |
El siguiente ejemplo modifica su flujo de cambios mediante el uso de la etapa de agregación $changeStreamSplitLargeEvent para dividir eventos grandes:
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
Nota
Solo puedes tener una $changeStreamSplitLargeEvent etapa en tu pipeline de agregación y debe ser la última etapa en el pipeline.
Puede llamar al método getSplitEvent() en el cursor de su flujo de cambio para acceder a SplitEvent como se muestra en el siguiente ejemplo:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
Para obtener más información sobre la etapa de agregación $changeStreamSplitLargeEvent, consulta la $changeStreamSplitLargeEvent documentación del servidor.
Incluir preimágenes y postimágenes
Puedes configurar el evento de cambio para que contenga u omita los siguientes datos:
La preimagen, un documento que representa la versión del documento antes de la operación, si existe
La postimagen, un documento que representa la versión del documento después de la operación, si existe
Importante
Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.
Para recibir eventos de flujo de cambios que incluyan una preimagen o una postimagen, debes realizar las siguientes acciones:
Habilita las preimágenes y las postimágenes para la colección en tu implementación de MongoDB.
Tip
Para obtener instrucciones sobre cómo habilitar imágenes pre/post en la implementación, consulte Change Streams con imágenes pre/post en documentos en el manual del servidor.
Para aprender a indicar al driver que cree una colección con imágenes previas y posteriores habilitadas, consulte la sección Crear una colección con imágenes previas y posteriores habilitadas.
Configura tu flujo de cambios para recuperar imágenes previas, imágenes posteriores o ambas.
Tip
Para configurar su flujo de cambios para registrar la preimagen en los eventos de cambio, consulte Ejemplo de configuración de preimagen..
Para configurar su flujo de cambios para registrar la imagen posterior en eventos de cambio, consulte el Ejemplo de configuración de imagen posterior.
Crear una colección con imágenes previas y posteriores habilitadas
Para utilizar el controlador para crear una colección con las opciones de pre-imagen y post-imagen activadas, especifica una instancia de ChangeStreamPreAndPostImagesOptions y llama al método createCollection() como se muestra en el siguiente ejemplo:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
Puedes cambiar la opción de pre-imagen y post-imagen en una colección existente ejecutando el comando collMod desde la Shell de MongoDB. Para aprender cómo realizar esta operación, consulte la entrada sobre collMod en el manual del servidor.
Advertencia
Si habilitó imágenes previas o posteriores en una colección, modificar estas configuraciones con collMod puede provocar que los flujos de cambio existentes en esa colección fallen.
Ejemplo de configuración previa a la imagen
El siguiente ejemplo de código muestra cómo se puede configurar un flujo de cambios en la colección myColl para incluir la preimagen y mostrar cualquier evento de cambio:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
El ejemplo anterior configura el flujo de cambios para que use la opción FullDocumentBeforeChange.REQUIRED. Esta opción configura el flujo de cambios para que requiera preimágenes para los eventos de cambio de reemplazo, actualización y eliminación. Si la pre-imagen no está disponible, el controlador genera un error.
Supongamos que actualizas el valor del campo amount en un documento de 150 a 2000. Este evento de cambio genera la siguiente salida:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
Para ver una lista de opciones, consulta la documentación de la API FullDocumentBeforeChange.
Ejemplo de configuración posterior a la imagen
El siguiente ejemplo de código muestra cómo se puede configurar un flujo de cambios en la colección myColl para incluir la preimagen y mostrar cualquier evento de cambio:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
El ejemplo anterior configura el flujo de cambios para que use la opción FullDocument.WHEN_AVAILABLE. Esta opción configura el flujo de cambios para que devuelva la post-imagen del documento modificado para eventos de cambio de reemplazo y actualización, si está disponible.
Supongamos que actualiza el valor del campo color de un documento de "purple" a "pink". El evento de cambio genera el siguiente resultado:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
Para obtener una lista de opciones, consulte la documentación de la API FullDocument.
Información Adicional
Documentación de la API
Para obtener más información sobre los métodos y clases utilizados para administrar flujos de cambios, consulte la siguiente documentación de API: