Docs Menu
Docs Home
/ /

Flujos de cambio abiertos

En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar cambios en tiempo real en tu base de datos. Un flujo de cambios es una funcionalidad del servidor de MongoDB que permite a tu aplicación suscribirse a cambios de datos en una única colección, base de datos o implementación.

Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a una implementación de MongoDB v6.0 o posterior, también puede configurar los eventos para que incluyan los datos del documento antes y después del cambio.

Aprenda a abrir y configurar sus flujos de cambio en las siguientes secciones:

  • Abre un flujo de cambios

  • Aplicar operadores de agregación a su flujo de cambios

  • Incluir preimágenes y postimágenes

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puede usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Atlas Stream Processing en la documentación de MongoDB Atlas.

Puede abrir un flujo de cambios para suscribirse a tipos específicos de cambios de datos y producir eventos de cambio en su aplicación.

Para abrir un flujo de cambios, llame al watch() método en una instancia de MongoCollection, MongoDatabase o MongoClient.

Importante

Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página del manual del servidor MongoDB "Registro de operaciones del conjunto de réplicas".

El objeto en el que llama al método watch() determina el alcance de los eventos que escucha el flujo de cambios:

  • MongoCollection.watch() supervisa una colección.

  • MongoDatabase.watch() supervisa todas las colecciones en una base de datos.

  • MongoClient.watch() supervisa todos los cambios en la implementación de MongoDB conectada.

El watch() método toma una secuencia de agregación opcional como primer parámetro, que consiste en una lista de etapas que se pueden usar para filtrar y transformar la salida del evento de cambio, de la siguiente manera:

List<Bson> pipeline = List.of(
Aggregates.match(
Filters.in("operationType",
List.of("insert", "update"))),
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

Nota

Para los eventos de cambio de la operación de actualización, los flujos de cambio solo devuelven los campos modificados por defecto, en lugar de todo el documento actualizado. Puede configurar su flujo de cambio para que también devuelva la versión más reciente del documento llamando al método miembro fullDocument() del objeto ChangeStreamIterable con el valor FullDocument.UPDATE_LOOKUP, como se indica a continuación:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

El método watch() devuelve una instancia de ChangeStreamIterable, una interfaz que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamIterable también hereda métodos de su interfaz principal, MongoIterable que implementa la interfaz principal de Java Iterable.

Puede llamar a forEach() en ChangeStreamIterable para manejar eventos a medida que ocurren, o puede usar el método iterator() que devuelve una instancia MongoChangeStreamCursor que puede usar para recorrer los resultados.

Puede llamar a los siguientes métodos en una instancia MongoChangeStreamCursor:

  • hasNext(): Comprueba si hay más resultados

  • next(): Devuelve el siguiente documento de la colección

  • tryNext(): Devuelve inmediatamente el siguiente elemento disponible en el flujo de cambio o null

Importante

Iterar sobre el cursor bloquea el hilo actual

Iterar un cursor usando forEach() o cualquier método iterator() bloquea el hilo actual mientras el flujo de cambios correspondiente escucha eventos. Si su programa necesita continuar ejecutando otra lógica, como procesar solicitudes o responder a la entrada del usuario, considere crear y escuchar su flujo de cambios en un hilo separado.

A diferencia del MongoCursor devuelto por otras consultas, un MongoChangeStreamCursor asociado a un flujo de cambios espera hasta que llegue un evento de cambio antes de devolver un resultado de next(). Por lo tanto, las llamadas a next() que utilizan el MongoChangeStreamCursor de un flujo de cambios nunca generan un java.util.NoSuchElementException.

Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamIterable devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamIterable al final de este ejemplo para más detalles sobre los métodos disponibles.

Este ejemplo muestra cómo abrir un flujo de cambios en la colección myColl e imprimir eventos del flujo de cambios a medida que ocurren.

El controlador almacena eventos de flujo de cambios en una variable de tipo ChangeStreamIterable. En el siguiente ejemplo, especificamos que el controlador debe rellenar el objeto ChangeStreamIterable con tipos Document. Como resultado, el controlador almacena eventos de flujo de cambios individuales como objetos ChangeStreamDocument.

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

Una operación de inserción en la colección produce el siguiente resultado:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

Nota

Configuración de ejemplo

Este ejemplo se conecta a una instancia de MongoDB mediante una URI de conexión. Para obtener más información sobre cómo conectarse a su instancia de MongoDB, consulte Guía para crear un MongoClient. Este ejemplo también utiliza la movies colección de la sample_mflix base de datos incluida en los conjuntos de datos de ejemplo de Atlas. Puede cargarlos en su base de datos en la versión gratuita de MongoDB Atlas siguiendo la guía de introducción de MongoDB.

Este ejemplo muestra cómo abrir un flujo de cambios mediante el método watch. El archivo Watch.java llama al método watch() con una canalización como argumento para filtrar solo los eventos "insert" y "update". El archivo WatchCompanion.java inserta, actualiza y elimina un documento.

Para utilizar los siguientes ejemplos, ejecute los archivos en este orden:

  1. Ejecute el archivo Watch.java.

  2. Ejecute el archivo WatchCompanion.java.

Nota

El archivo Watch.java continuará ejecutándose hasta que se ejecute el archivo WatchCompanion.java.

Watch.java:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package org.example;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.Aggregates;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion.java:

// Performs CRUD operations to generate change events when run with the Watch application
package org.example;
import org.bson.Document;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.result.DeleteResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

Las aplicaciones anteriores generarán la siguiente salida:

Watch.java capturará solo las operaciones insert y update, ya que la canalización de agregación filtra la operación delete:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

WatchCompanion imprimirá un resumen de las operaciones completadas:

Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Para obtener más información sobre el método watch(), consulte la siguiente documentación de API:

Puede pasar una canalización de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.

Para saber qué operadores de agregación admite su versión de MongoDB Server, consulte Modificar la salida del flujo de cambios.

El siguiente ejemplo de código muestra cómo puede aplicar una canalización de agregación para configurar su flujo de cambios para recibir eventos de cambio solo para operaciones de inserción y actualización:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

Una operación de actualización en la colección produce el siguiente resultado:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

A partir de MongoDB 7.0, puede utilizar la etapa de agregación $changeStreamSplitLargeEvent para dividir eventos que superen los 16 MB en fragmentos más pequeños.

Use $changeStreamSplitLargeEvent solo cuando sea estrictamente necesario. Por ejemplo, use $changeStreamSplitLargeEvent si su aplicación requiere imágenes completas del documento, tanto previas como posteriores, y genera eventos que superan los 16 MB.

La etapa $changeStreamSplitLargeEvent devuelve los fragmentos secuencialmente. Puede acceder a ellos mediante un cursor de flujo de cambios. Cada fragmento incluye un objeto SplitEvent con los siguientes campos:

Campo
Descripción

fragment

El índice del fragmento, comenzando en 1

of

El número total de fragmentos que componen el evento dividido

El siguiente ejemplo modifica su flujo de cambios mediante el uso de la etapa de agregación $changeStreamSplitLargeEvent para dividir eventos grandes:

ChangeStreamIterable<Document> changeStream = collection.watch(
List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

Nota

Solo puede tener una etapa $changeStreamSplitLargeEvent en su canal de agregación, y debe ser la última etapa del canal.

Puede llamar al método getSplitEvent() en el cursor de su flujo de cambio para acceder a SplitEvent como se muestra en el siguiente ejemplo:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

Para obtener más información sobre la $changeStreamSplitLargeEvent etapa de agregación, consulte la documentación del servidor $changeStreamSplitLargeEvent.

Puede configurar el evento de cambio para que contenga u omita los siguientes datos:

  • La preimagen, un documento que representa la versión del documento antes de la operación, si existe

  • La postimagen, un documento que representa la versión del documento después de la operación, si existe

Importante

Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.

Para recibir eventos de flujo de cambios que incluyan una imagen previa o posterior, debe realizar las siguientes acciones:

Para utilizar el controlador para crear una colección con las opciones de imagen previa y imagen posterior habilitadas, especifique una instancia de ChangeStreamPreAndPostImagesOptions y llame al método createCollection() como se muestra en el siguiente ejemplo:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

Puedes cambiar la opción de pre-imagen y post-imagen en una colección existente ejecutando el comando collMod desde la Shell de MongoDB. Para aprender cómo realizar esta operación, consulte la entrada sobre collMod en el manual del servidor.

Advertencia

Si habilitó imágenes previas o posteriores en una colección, modificar estas configuraciones con collMod puede provocar que los flujos de cambio existentes en esa colección fallen.

El siguiente ejemplo de código muestra cómo puede configurar un flujo de cambios en la colección myColl para incluir la imagen previa y generar cualquier evento de cambio:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

El ejemplo anterior configura el flujo de cambios para usar la opción FullDocumentBeforeChange.REQUIRED. Esta opción configura el flujo de cambios para requerir imágenes previas para los eventos de reemplazo, actualización y eliminación. Si la imagen previa no está disponible, el controlador genera un error.

Supongamos que actualiza el valor del campo amount de un documento de 150 a 2000. Este evento de cambio genera el siguiente resultado:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

Para obtener una lista de opciones, consulte la documentación de la API FullDocumentBeforeChange.

El siguiente ejemplo de código muestra cómo puede configurar un flujo de cambios en la colección myColl para incluir la imagen previa y generar cualquier evento de cambio:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

El ejemplo anterior configura el flujo de cambios para usar la opción FullDocument.WHEN_AVAILABLE. Esta opción configura el flujo de cambios para devolver la imagen posterior del documento modificado para los eventos de reemplazo y actualización, si está disponible.

Supongamos que actualiza el valor del campo color de un documento de "purple" a "pink". El evento de cambio genera el siguiente resultado:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

Para obtener una lista de opciones, consulte la documentación de la API FullDocument.

Para obtener más información sobre los métodos y clases utilizados para administrar flujos de cambios, consulte la siguiente documentación de API:

Volver

Monitoring