Para agentes de IA: hay un índice de documentación disponible en https://www.mongodb.com/es/docs/llms.txt — versiones en markdown de todas las páginas están disponibles agregando .md a cualquier ruta URL.
Docs Menu

Flujos de cambio

En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.

Los ejemplos de esta guía usan la colección sample_restaurants.restaurants de los conjuntos de datos de muestra de Atlas. Para aprender a crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulta la guía Comenzar.

Importante

Proyecto Reactor librería

Esta guía usa la librería Proyecto Reactor para consumir instancias Publisher devueltas por los métodos del driver Reactive Streams de Java. Para obtener más información sobre la biblioteca Project Reactor y cómo utilizarla, consulta Primeros pasos en la documentación de Reactor. Para obtener más información sobre cómo utilizamos los métodos de la librería Project Reactor en esta guía, consulta la guía Guardar datos en MongoDB.

Para abrir un flujo de cambios, llama al método watch(). La instancia sobre la que llama el método determina el alcance de los eventos a los que el flujo de cambios escucha. Puedes llamar al método watch() en instancias de las siguientes clases:

  • MongoClient: Para supervisar todos los cambios en la implementación de MongoDB

  • MongoDatabase: Para supervisar los cambios en todas las colecciones de la base de datos

  • MongoCollection: Para supervisar los cambios en la colección

El siguiente ejemplo abre un flujo de cambios en la colección restaurants y muestra los cambios a medida que ocurren:

// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch();
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

Para comenzar a observar los cambios, ejecuta la aplicación. Luego, en una aplicación o shell por separado, realiza una operación de guardar en la colección restaurants. Actualizar un documento que tiene un valor de campo "name" de "Blarney Castle" da como resultado la siguiente salida de flujo de cambios:

Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null,
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{value=...}}

Puedes pasar un pipeline de agregación como parámetro al método watch() para modificar la salida del flujo de cambios. Este parámetro te permite supervisar solo eventos de cambio especificados.

Puedes especificar las siguientes etapas de agregación en el parámetro pipeline:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

El siguiente ejemplo pasa un pipeline de agregación a un flujo de cambios para registrar solo las operaciones de actualización:

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

Para obtener más información sobre cómo modificar la salida de tu flujo de cambios, consulta la sección Modificar salida del flujo de cambio en el manual del MongoDB Server.

Puedes encadenar métodos al método watch() que representan opciones que puedes usar para configurar la operación del flujo de cambios. Si no especificas ninguna opción, el driver no personaliza la operación.

La siguiente tabla describe los métodos que puedas encadenar a watch() para personalizar su comportamiento:

Opción
Descripción

fullDocument()

Especifica si se debe mostrar el documento completo después de la modificación, en lugar de mostrar solo los cambios realizados. Para obtener más información sobre esta opción, consulte Incluir imágenes previas y posteriores.

fullDocumentBeforeChange()

Especifica si se debe mostrar el documento completo tal como estaba antes del cambio, en lugar de mostrar solo las modificaciones realizadas en el documento. Para obtener más información sobre esta opción, consulte Incluir imágenes previas y posteriores.

resumeAfter()

Indica watch() a que reanude la devolución de cambios después de la operación especificada en el token de reanudación.
Cada documento de evento de flujo de cambios incluye un token de reanudación como el _id campo. Pase el _id campo completo del documento de evento de cambio que representa la operación que desea reanudar.
resumeAfter() es mutuamente excluyente con startAfter() startAtOperationTime()y.

startAfter()

Indica a watch() que inicie un nuevo flujo de cambios después de la operación especificada en el token de reanudación. Permite que las notificaciones se reanuden después de un evento de invalidación.
Cada documento de evento de flujo de cambios incluye un token de reanudación como el _id campo. Pase el _id campo completo del documento de evento de cambio que representa la operación que desea reanudar.
startAfter() es mutuamente excluyente con resumeAfter() startAtOperationTime()y.

startAtOperationTime()

Indica watch() a que devuelva solo los eventos que ocurran después de la marca de tiempo especificada.
startAtOperationTime() es mutuamente excluyente con resumeAfter() startAfter()y.

maxAwaitTime()

Especifica el tiempo máximo, en milisegundos, que el servidor espera para reportar nuevos cambios de datos al cursor del flujo de cambios antes de devolver un grupo vacío. Por defecto 1000 milisegundos.

showExpandedEvents()

A partir de MongoDB Server v6.0, los flujos de cambios admiten notificaciones de cambios para eventos de Data Definition Language (DDL), tales como los eventos createIndexes y dropIndexes. Para incluir eventos expandidos en un flujo de cambios, llama a este método y pasa el valor, true.

batchSize()

Especifica el número máximo de eventos de cambio que se devolverán en cada lote de la respuesta del clúster de MongoDB. Por defecto, el controlador establece este valor en Long.MAX_VALUE.

Un batchSize de 0 significa que el cursor se establecerá, pero no se devolverá ningún documento en el primer lote.

collation()

Especifica la intercalación que se utilizará para el cursor de flujo de cambios.

comment()

Adjunta un comentario a la operación.

Importante

Puede habilitar pre imágenes y post imágenes en colecciones solo si su implementación utiliza MongoDB v6.0 o posterior.

De forma predeterminada, cuando realizas una operación en una colección, el evento de cambio correspondiente solo incluye el delta de los campos modificados por esa operación. Para ver el documento completo antes o después de un cambio, encadena el método fullDocumentBeforeChange() o fullDocument() al método watch().

La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, se debe pasar uno de los siguientes valores al método fullDocumentBeforeChange():

  • FullDocumentBeforeChange.WHEN_AVAILABLE: El evento de cambio incluye una preimagen del documento modificado para los eventos de cambio solo si la preimagen está disponible.

  • FullDocumentBeforeChange.REQUIREDEl evento de cambio incluye una preimagen del documento modificado para los eventos de cambio. Si la preimagen no está disponible, el driver genera un error.

La post-imagen es la versión completa de un documento después de un cambio. Para incluir la post-imagen en el evento del flujo de cambios, pasa uno de los siguientes valores al método fullDocument():

  • FullDocument.UPDATE_LOOKUP: El evento de cambio incluye una copia de todo el documento cambiado a partir de cierto tiempo después del cambio.

  • FullDocument.WHEN_AVAILABLE: El evento de cambio incluye una postimagen del documento modificado para eventos de cambio solo si la postimagen está disponible.

  • FullDocument.REQUIRED: El evento de cambio incluye una imagen posterior del documento modificado para eventos de cambio. Si la imagen posterior no está disponible, el driver genera un error.

El siguiente ejemplo abre un flujo de cambios en una colección e incluye la imagen posterior de documentos actualizados enlazando el método fullDocument() al método watch():

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received including the full
// document after the update
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

Para obtener más información sobre pre-imágenes y post-imágenes, consulta Change Streams con preimágenes y postimágenes de documentos en el manual de MongoDB Server.

Para saber más sobre las change streams, consulta Change Streams en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: