Docs Menu
Docs Home
/ /
/ / /

Supervisar los cambios en los datos

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.

Los ejemplos de esta guía utilizan el sample_restaurants.restaurants colección de la Conjuntos de datos de muestra de Atlas. Para aprender a crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte Guía deinicio rápido.

Importante

Proyecto Reactor librería

Esta guía utiliza la biblioteca Project Reactor para consumir Publisher las instancias devueltas por los métodos del controlador Java Reactive Streams. Para obtener más información sobre la biblioteca Project Reactor y cómo usarla, consulte la sección "Introducción" en la documentación de Reactor. Para obtener más información sobre cómo usamos los métodos de la biblioteca Project Reactor en esta guía, consulte la guía "Escribir datos en MongoDB".

Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método determina el alcance de los eventos que el flujo de cambios escucha. Puede llamar al método watch() en instancias de las siguientes clases:

  • MongoClient:Para monitorear todos los cambios en la implementación de MongoDB

  • MongoDatabase:Para monitorear los cambios en todas las colecciones de la base de datos

  • MongoCollection:Para monitorear cambios en la colección

El siguiente ejemplo abre un flujo de cambios en la colección restaurants y muestra los cambios a medida que ocurren:

// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch();
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

Para empezar a observar los cambios, ejecute la aplicación. A continuación, en una aplicación o shell independiente, realice una operación de escritura en la colección restaurants. Actualizar un documento cuyo valor de campo "name" es "Blarney Castle" genera la siguiente salida del flujo de cambios:

Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null,
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{value=...}}

Puede pasar una canalización de agregación como parámetro al método watch() para modificar la salida del flujo de cambios. Este parámetro le permite observar únicamente los eventos de cambio especificados.

Puede especificar las siguientes etapas de agregación en el parámetro pipeline:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

El siguiente ejemplo pasa una canalización de agregación a un flujo de cambios para registrar solo operaciones de actualización:

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

Para obtener más información sobre cómo modificar la salida de su flujo de cambios, consulte la sección Modificar la salida del flujo de cambios en el manual de MongoDB Server.

Puede encadenar métodos al método watch() que representan opciones que puede usar para configurar la operación del flujo de cambios. Si no especifica ninguna opción, el controlador no personaliza la operación.

La siguiente tabla describe los métodos que puedes encadenar a watch() para personalizar su comportamiento:

Opción
Descripción

fullDocument()

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

fullDocumentBeforeChange()

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

resumeAfter()

Directs watch() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
resumeAfter() is mutually exclusive with startAfter() and startAtOperationTime().

startAfter()

Directs watch() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
startAfter() is mutually exclusive with resumeAfter() and startAtOperationTime().

startAtOperationTime()

Directs watch() to return only events that occur after the specified timestamp.
startAtOperationTime() is mutually exclusive with resumeAfter() and startAfter().

maxAwaitTime()

Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.

showExpandedEvents()

Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, call this method and pass in the value, true.

batchSize()

Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster. By default, returns an initial batch size of 101 documents and a maximum size of 16 mebibytes (MiB) for each subsequent batch. This option can enforce a smaller limit than 16 MiB, but not a larger one. If you set batchSize to a limit that results in batches larger than 16 MiB, this option has no effect.
A batchSize of 0 means that the cursor will be established, but no documents will be returned in the first batch.

collation()

Specifies the collation to use for the change stream cursor.

comment()

Attaches a comment to the operation.

Importante

Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB v6.0 o posterior.

De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, encadene el método fullDocumentBeforeChange() o fullDocument() al método watch().

La preimagen es la versión completa de un documento antes de un cambio. Para incluirla en el evento de flujo de cambios, pase uno de los siguientes valores al fullDocumentBeforeChange() método:

  • FullDocumentBeforeChange.WHEN_AVAILABLE:El evento de cambio incluye una imagen previa del documento modificado para eventos de cambio solo si la imagen previa está disponible.

  • FullDocumentBeforeChange.REQUIREDEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el controlador genera un error.

La imagen posterior es la versión completa de un documento después de un cambio. Para incluirla en el evento de flujo de cambios, pase uno de los siguientes valores al fullDocument() método:

  • FullDocument.UPDATE_LOOKUP:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.

  • FullDocument.WHEN_AVAILABLE:El evento de cambio incluye una imagen posterior del documento modificado solo para eventos de cambio si la imagen posterior está disponible.

  • FullDocument.REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el controlador genera un error.

El siguiente ejemplo abre un flujo de cambios en una colección e incluye la imagen posterior de los documentos actualizados encadenando el método fullDocument() al método watch():

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received including the full
// document after the update
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Búsqueda geoespacial

En esta página