Docs Menu
Docs Home
/ /

Flujos de cambio

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en tiempo real en su base de datos. Un flujo de cambios es una función del servidor MongoDB que permite a su aplicación suscribirse a los cambios de datos en una sola colección, base de datos o implementación. Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a MongoDB v6.0 o posterior, puede configurar los eventos para incluir los datos del documento antes y después del cambio.

Aprenda a abrir y configurar sus flujos de cambio en las siguientes secciones:

  • Abre un flujo de cambios

  • Aplicar operadores de agregación a su flujo de cambios

  • Dividir eventos de flujo de cambios grandes

  • Incluir preimágenes y postimágenes

Puede abrir un flujo de cambios para suscribirse a tipos específicos de cambios de datos y producir eventos de cambio en su aplicación.

Para abrir un flujo de cambios, llame al watch() método en una instancia de MongoCollection, MongoDatabase o MongoClient.

Importante

Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página del manual del servidor Registro de operaciones del conjunto de réplicas.

El objeto sobre el cual llamas al método watch() determina el alcance de los eventos a los que el flujo de cambios está atento.

Si llama a watch() en un MongoCollection, el flujo de cambios monitorea una colección.

Si llamas a watch() en una MongoDatabase, el flujo de cambios supervisa todas las colecciones en esa base de datos.

Si llama a watch() en un MongoClient, el flujo de cambios monitorea todos los cambios en la implementación de MongoDB conectada.

El siguiente ejemplo de código muestra cómo abrir un flujo de cambios e imprimir eventos de flujo de cambios cada vez que cambian los datos en la colección:

// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

Una operación de inserción en la colección debería producir un resultado similar al siguiente texto:

Received a change event: ChangeStreamDocument{
operationType='insert',
resumeToken={"_data": "825EC..."},
namespace=myDb.myChangeStreamCollection,
...
}

Para obtener más información sobre el método watch(), consulte la siguiente documentación de API:

Puede pasar una canalización de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.

Para saber qué operadores de agregación admite su versión de servidor MongoDB, consulte Modificar la salida del flujo de cambios.

El siguiente ejemplo de código muestra cómo puede aplicar una canalización de agregación para configurar su flujo de cambios para recibir eventos de cambio solo para operaciones de inserción y actualización:

val pipeline = listOf(
Aggregates.match(Filters.`in`("operationType",
listOf("insert", "update")))
)
// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

Cuando el flujo de cambios recibe un evento de cambio de actualización, el ejemplo de código anterior genera el siguiente texto:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
...

Al conectarse a MongoDB v7.0 o posterior, puede usar el operador de agregación $changeStreamSplitLargeEvent para dividir los documentos de eventos que superen los 16 MB en fragmentos más pequeños.

Utilice el operador $changeStreamSplitLargeEvent solo cuando prevea que los eventos del flujo de cambios superarán el límite de tamaño del documento. Por ejemplo, puede usar esta función si su aplicación requiere imágenes previas o posteriores del documento completo.

Una etapa de agregación $changeStreamSplitLargeEvent devuelve fragmentos secuencialmente. Puede acceder a los fragmentos mediante un cursor de flujo de cambios. Cada documento de fragmento incluye un objeto splitEvent con los siguientes campos:

Campo
Descripción

fragment

El índice del fragmento, comenzando en 1

of

El número total de fragmentos que componen el evento dividido

El siguiente ejemplo abre un flujo de cambios que incluye una canalización de agregación con una etapa de agregación $changeStreamSplitLargeEvent para dividir eventos grandes:

val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

Nota

Solo puede tener una etapa $changeStreamSplitLargeEvent en su canal de agregación, y debe ser la última etapa del canal.

Para obtener más información sobre el $changeStreamSplitLargeEvent operador de agregación, consulte $changeStreamSplitLargeEvent (agregación) en el manual del servidor.

Puede configurar el evento de cambio para que contenga u omita los siguientes datos:

  • La preimagen que es un documento que representa la versión del documento antes de la operación si existe

  • La postimagen que es un documento que representa la versión del documento después de la operación si existe

Para recibir eventos de flujo de cambios que incluyan una imagen previa o posterior, debe conectarse a una implementación de MongoDB v6.0 o posterior y configurar lo siguiente:

  • Habilite imágenes previas y posteriores para la colección en su implementación de MongoDB.

    Tip

    Para saber cómo habilitarlos en su implementación, consulte la página del manual del servidor MongoDB Flujos de cambio con imágenes previas y posteriores al documento.

    Para aprender a indicar al driver que cree una colección con imágenes previas y posteriores habilitadas, consulte la sección Crear una colección con imágenes previas y posteriores habilitadas.

  • Configure su flujo de cambios para recuperar una o ambas imágenes previas o posteriores.

    Tip

    Para configurar su flujo de cambios para incluir la imagen previa, consulte el Ejemplo de configuración de imagen previa.

    Para configurar su flujo de cambios para incluir la imagen posterior, consulte el Ejemplo de configuración de imagen posterior.

Para crear una colección con la opción de preimagen y postimagen usando el controlador, especifique una instancia de ChangeStreamPreAndPostImagesOptions y llame al método createCollection() como se muestra en el siguiente ejemplo:

val collectionOptions = CreateCollectionOptions()
collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true))
database.createCollection("myChangeStreamCollection", collectionOptions)

Puede cambiar las opciones de preimagen y postimagen en una colección existente ejecutando el collMod comando desde MongoDB Shell. Para saber cómo realizar esta operación, consulte la documentación del manual del servidor collMod.

Advertencia

Cuando modifica esta opción en una colección, cualquier flujo de cambio abierto en esa colección en su aplicación puede fallar si está configurado para requerir recibir la imagen previa o la imagen posterior.

El siguiente ejemplo de código muestra cómo puede configurar un flujo de cambios para incluir la imagen previa y generar los resultados:

val job = launch {
val changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

El ejemplo anterior configura el flujo de cambios para usar la opción FullDocumentBeforeChange.REQUIRED. Esto configura el flujo de cambios para devolver imágenes previas para eventos de reemplazo, actualización y eliminación, y para que el servidor genere un error si la imagen previa no está disponible.

Supongamos que una aplicación actualizó el campo latestVersion de un documento en una colección de dependencias de bibliotecas de software del valor 2.0.0 al 2.1.0. El evento de cambio correspondiente, generado por el ejemplo de código anterior, debería ser similar al siguiente:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...}
namespace=software.libraries,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}},
...

Para obtener una lista de opciones, consulte la documentación de la API FullDocumentBeforeChange.

El siguiente ejemplo de código muestra cómo puede configurar un flujo de cambios para incluir la imagen posterior y generar los resultados:

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

El ejemplo anterior configura el flujo de cambios para usar la opción FullDocument.UPDATE_LOOKUP. Esto configura el flujo de cambios para devolver tanto las diferencias entre el documento original y el modificado como una copia del documento en algún momento posterior al cambio.

Supón que una aplicación actualizó el campo population de un documento del valor 800 al 950 en una colección de datos censales de ciudades. El evento de cambio correspondiente generado por el ejemplo de código anterior debería parecerse al siguiente texto:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
namespace=censusData.cities,
destinationNamespace=null,
fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}},
updatedFields={"population": 950}, ...
...

Para obtener una lista de opciones, consulte la documentación de la API FullDocument.

Volver

Monitoring

En esta página