Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Abrir Change Streams

En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar cambios en tiempo real en tu base de datos. Un flujo de cambios es una funcionalidad del servidor de MongoDB que permite a tu aplicación suscribirse a cambios de datos en una única colección, base de datos o implementación.

Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a una implementación de MongoDB v6.0 o posterior, también puede configurar los eventos para que incluyan los datos del documento antes y después del cambio.

Aprenda a abrir y configurar sus flujos de cambios en las siguientes secciones:

  • Abre un flujo de cambios

  • Aplicar operadores de agregación al flujo de cambios

  • Incluir preimágenes y postimágenes

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para obtener más información sobre esta funcionalidad, consulta Atlas Stream Processing en la documentación de MongoDB Atlas.

Puedes abrir un flujo de cambios para suscribirte a tipos específicos de cambios de datos y producir eventos de cambios en tu aplicación.

Para abrir un flujo de cambios, llame al watch() método en una instancia de un MongoCollection, MongoDatabase o MongoClient.

Importante

Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página del manual del servidor MongoDB "Registro de operaciones del conjunto de réplicas".

El objeto en el que llama al método watch() determina el alcance de los eventos que escucha el flujo de cambios:

  • MongoCollection.watch() supervisa una colección.

  • MongoDatabase.watch() supervisa todas las colecciones en una base de datos.

  • MongoClient.watch() supervisa todos los cambios en la implementación de MongoDB conectada.

El watch() método toma una secuencia de agregación opcional como primer parámetro, que consiste en una lista de etapas que se pueden usar para filtrar y transformar la salida del evento de cambio, de la siguiente manera:

List<Bson> pipeline = List.of(
Aggregates.match(
Filters.in("operationType",
List.of("insert", "update"))),
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

Nota

Para los eventos de cambio de la operación de actualización, los flujos de cambio solo devuelven los campos modificados por defecto, en lugar de todo el documento actualizado. Puede configurar su flujo de cambio para que también devuelva la versión más reciente del documento llamando al método miembro fullDocument() del objeto ChangeStreamIterable con el valor FullDocument.UPDATE_LOOKUP, como se indica a continuación:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

El método watch() devuelve una instancia de ChangeStreamIterable, una interfaz que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamIterable también hereda métodos de su interfaz principal, MongoIterable que implementa la interfaz principal de Java Iterable.

Puede llamar a forEach() en ChangeStreamIterable para manejar eventos a medida que ocurren, o puede usar el método iterator() que devuelve una instancia MongoChangeStreamCursor que puede usar para recorrer los resultados.

Puedes llamar a los siguientes métodos en una instancia de MongoChangeStreamCursor:

  • hasNext(): Comprueba si hay más resultados

  • next(): Devuelve el siguiente documento de la colección

  • tryNext(): Devuelve inmediatamente el siguiente elemento disponible en el flujo de cambios o null

Importante

Iterar sobre el cursor bloquea el hilo actual

Iterar a través de un cursor usando forEach() o cualquier método iterator() bloquea el hilo actual mientras el flujo de cambios correspondiente escucha eventos. Si tu programa necesita seguir ejecutando otras lógicas, como procesar solicitudes o responder a la entrada del usuario, ten en cuenta la posibilidad de crear y escuchar tu flujo de cambios en un hilo separado.

A diferencia del MongoCursor devuelto por otras consultas, un MongoChangeStreamCursor asociado con un flujo de cambios espera hasta que llegue un evento de cambio antes de devolver un resultado de next(). Como resultado, las llamadas a next() que utilizan MongoChangeStreamCursor de un flujo de cambios nunca generan un java.util.NoSuchElementException.

Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamIterable devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamIterable al final de este ejemplo para más detalles sobre los métodos disponibles.

Este ejemplo muestra cómo abrir un flujo de cambios en la colección myColl e imprimir los eventos del flujo de cambios a medida que ocurren.

El driver almacena los eventos del flujo de cambios en una variable de tipo ChangeStreamIterable. En el siguiente ejemplo, especificamos que el driver debe completar el objeto ChangeStreamIterable con tipos de Document. Como resultado, el controlador almacena eventos individuales de flujo de cambios como objetos ChangeStreamDocument.

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

Una operación de inserción en la colección produce la siguiente salida:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

Nota

Configuración de ejemplo

Este ejemplo se conecta a una instancia de MongoDB utilizando un URI de conexión. Para obtener más información sobre cómo conectarse a tu instancia de MongoDB, consulta el Crea un MongoClient guía. Este ejemplo también utiliza la colección movies en la base de datos sample_mflix incluida en los conjuntos de datos de muestra de Atlas. Puedes cargarlos en tu base de datos en el nivel gratuito de MongoDB Atlas siguiendo MongoDB Empezar.

Este ejemplo muestra cómo abrir un flujo de cambios mediante el método watch. El archivo Watch.java llama al método watch() con una canalización como argumento para filtrar solo los eventos "insert" y "update". El archivo WatchCompanion.java inserta, actualiza y elimina un documento.

Para utilizar los siguientes ejemplos, ejecute los archivos en este orden:

  1. Ejecuta el argumento de entrada Watch.java Archivo.

  2. Ejecuta el argumento de entrada WatchCompanion.java Archivo.

Nota

El archivo Watch.java seguirá ejecutándose hasta que se ejecute el archivo WatchCompanion.java.

Watch.java:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package org.example;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.Aggregates;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion.java:

// Performs CRUD operations to generate change events when run with the Watch application
package org.example;
import org.bson.Document;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.result.DeleteResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

Las aplicaciones anteriores generarán la siguiente salida:

Watch.java capturará solo las operaciones insert y update, ya que la canalización de agregación filtra la operación delete:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

WatchCompanion imprimirá un resumen de las operaciones completadas:

Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

Para obtener más información sobre el método watch(), consulte la siguiente documentación de API:

Puedes pasar una pipeline de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.

Para aprender qué operadores de agregación admite la versión de tu MongoDB Server, consulta Modificar la salida de Change Stream.

El siguiente ejemplo de código muestra cómo puedes aplicar una pipeline de agregación para configurar tu flujo de cambios y recibir eventos de cambio solo para operaciones de inserción y actualización:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

Una operación de actualización en la colección produce el siguiente resultado:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

A partir de MongoDB 7.0, puede utilizar la etapa de agregación $changeStreamSplitLargeEvent para dividir los eventos que superen los 16 MB en fragmentos más pequeños.

Use $changeStreamSplitLargeEvent solo cuando sea estrictamente necesario. Por ejemplo, use $changeStreamSplitLargeEvent si su aplicación requiere imágenes completas del documento, tanto previas como posteriores, y genera eventos que superan los 16 MB.

La etapa $changeStreamSplitLargeEvent devuelve los fragmentos secuencialmente. Puedes acceder a los fragmentos utilizando un cursor de flujo de cambios. Cada fragmento incluye un objeto SplitEvent que contiene los siguientes campos:

Campo
Descripción

fragment

El índice del fragmento, comenzando en 1

of

El número total de fragmentos que componen el evento dividido

El siguiente ejemplo modifica su flujo de cambios mediante el uso de la etapa de agregación $changeStreamSplitLargeEvent para dividir eventos grandes:

ChangeStreamIterable<Document> changeStream = collection.watch(
List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

Nota

Solo puedes tener una $changeStreamSplitLargeEvent etapa en tu pipeline de agregación y debe ser la última etapa en el pipeline.

Puede llamar al método getSplitEvent() en el cursor de su flujo de cambio para acceder a SplitEvent como se muestra en el siguiente ejemplo:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

Para obtener más información sobre la etapa de agregación $changeStreamSplitLargeEvent, consulta la $changeStreamSplitLargeEvent documentación del servidor.

Puedes configurar el evento de cambio para que contenga u omita los siguientes datos:

  • La preimagen, un documento que representa la versión del documento antes de la operación, si existe

  • La postimagen, un documento que representa la versión del documento después de la operación, si existe

Importante

Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.

Para recibir eventos de flujo de cambios que incluyan una preimagen o una postimagen, debes realizar las siguientes acciones:

Para utilizar el controlador para crear una colección con las opciones de pre-imagen y post-imagen activadas, especifica una instancia de ChangeStreamPreAndPostImagesOptions y llama al método createCollection() como se muestra en el siguiente ejemplo:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

Puedes cambiar la opción de pre-imagen y post-imagen en una colección existente ejecutando el comando collMod desde la Shell de MongoDB. Para aprender cómo realizar esta operación, consulte la entrada sobre collMod en el manual del servidor.

Advertencia

Si habilitó imágenes previas o posteriores en una colección, modificar estas configuraciones con collMod puede provocar que los flujos de cambio existentes en esa colección fallen.

El siguiente ejemplo de código muestra cómo se puede configurar un flujo de cambios en la colección myColl para incluir la preimagen y mostrar cualquier evento de cambio:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

El ejemplo anterior configura el flujo de cambios para que use la opción FullDocumentBeforeChange.REQUIRED. Esta opción configura el flujo de cambios para que requiera preimágenes para los eventos de cambio de reemplazo, actualización y eliminación. Si la pre-imagen no está disponible, el controlador genera un error.

Supongamos que actualizas el valor del campo amount en un documento de 150 a 2000. Este evento de cambio genera la siguiente salida:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

Para ver una lista de opciones, consulta la documentación de la API FullDocumentBeforeChange.

El siguiente ejemplo de código muestra cómo se puede configurar un flujo de cambios en la colección myColl para incluir la preimagen y mostrar cualquier evento de cambio:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

El ejemplo anterior configura el flujo de cambios para que use la opción FullDocument.WHEN_AVAILABLE. Esta opción configura el flujo de cambios para que devuelva la post-imagen del documento modificado para eventos de cambio de reemplazo y actualización, si está disponible.

Supongamos que actualiza el valor del campo color de un documento de "purple" a "pink". El evento de cambio genera el siguiente resultado:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

Para obtener una lista de opciones, consulte la documentación de la API FullDocument.

Para obtener más información sobre los métodos y clases utilizados para administrar flujos de cambios, consulte la siguiente documentación de API:

Volver

Monitoring