Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Flujos de cambio

En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar los cambios en tiempo real de tu base de datos. Un flujo de cambios es una funcionalidad del servidor MongoDB que permite que tu aplicación se suscriba a los cambios de datos en una colección, base de datos o implementación. Se puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe tu aplicación. Al conectarse a MongoDB v6.0 o posterior, puede configurar los eventos para incluir los datos del documento antes y después del cambio.

Aprenda a abrir y configurar sus flujos de cambios en las siguientes secciones:

  • Abre un flujo de cambios

  • Aplicar operadores de agregación al flujo de cambios

  • Dividir eventos de flujo de cambios grandes

  • Incluir preimágenes y postimágenes

Puedes abrir un flujo de cambios para suscribirte a tipos específicos de cambios de datos y producir eventos de cambios en tu aplicación.

Para abrir un flujo de cambios, llame al watch() método en una instancia de un MongoCollection, MongoDatabase o MongoClient.

Importante

Las implementaciones autónomas de MongoDB no admiten flujos de cambios porque la funcionalidad requiere un oplog de set de réplicas. Para obtener más información sobre el oplog, consulta la página del manual del servidor Registro de operaciones del set de réplicas.

El objeto sobre el cual llamas al método watch() determina el alcance de los eventos a los que el flujo de cambios está atento.

Si llamas a watch() en un MongoCollection, el flujo de cambios supervisa una colección.

Si llamas a watch() en una MongoDatabase, el flujo de cambios supervisa todas las colecciones en esa base de datos.

Si llamas a watch() en un MongoClient, el flujo de cambios supervisa todos los cambios en la implementación de MongoDB conectada.

El siguiente ejemplo de código muestra cómo abrir un flujo de cambios e imprimir los eventos del flujo de cambios cada vez que cambian los datos en la colección:

// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

Una operación de inserción en la colección debería producir una salida similar al siguiente texto:

Received a change event: ChangeStreamDocument{
operationType='insert',
resumeToken={"_data": "825EC..."},
namespace=myDb.myChangeStreamCollection,
...
}

Para obtener más información sobre el método watch(), consulte la siguiente documentación de API:

Puedes pasar una pipeline de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.

Para saber qué operadores de agregación admite su versión de servidor MongoDB, consulte Modificar la salida del flujo de cambios.

El siguiente ejemplo de código muestra cómo puedes aplicar una pipeline de agregación para configurar tu flujo de cambios y recibir eventos de cambio solo para operaciones de inserción y actualización:

val pipeline = listOf(
Aggregates.match(Filters.`in`("operationType",
listOf("insert", "update")))
)
// Launch the change stream in a separate coroutine,
// so you can cancel it later.
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

Cuando el flujo de cambios recibe un evento de cambio de actualización, el ejemplo de código anterior muestra el siguiente texto:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
...

Cuando se conecta a MongoDB v7.0 o posterior, puede utilizar el operador de agregación $changeStreamSplitLargeEvent para dividir los documentos de eventos que superan los 16 MB en fragmentos más pequeños.

Utilice el operador $changeStreamSplitLargeEvent solo cuando espere que los eventos de flujo de cambios superen el límite de tamaño de documentos. Por ejemplo, puedes usar esta funcionalidad si tu aplicación requiere preimágenes o postimágenes completas de documentos.

Una etapa de agregación de $changeStreamSplitLargeEvent devuelve fragmentos de manera secuencial. Puedes acceder a los fragmentos utilizando un cursor de flujo de cambios. Cada documento de fragmento incluye un objeto splitEvent que contiene los siguientes campos:

Campo
Descripción

fragment

El índice del fragmento, comenzando en 1

of

El número total de fragmentos que componen el evento dividido

El siguiente ejemplo abre un flujo de cambios que incluye una canalización de agregación con una etapa de agregación $changeStreamSplitLargeEvent para dividir grandes eventos:

val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

Nota

Solo puedes tener una $changeStreamSplitLargeEvent etapa en tu pipeline de agregación y debe ser la última etapa en el pipeline.

Para aprender más sobre el operador de agregación $changeStreamSplitLargeEvent, consulta $changeStreamSplitLargeEvent (agregación) en el manual del servidor.

Puedes configurar el evento de cambio para que contenga u omita los siguientes datos:

  • La preimagen que es un documento que representa la versión del documento antes de la operación si existe

  • La postimagen que es un documento que representa la versión del documento después de la operación si existe

Para recibir eventos de flujo de cambios que incluyan una imagen previa o posterior, debe conectarse a una implementación de MongoDB v6.0 o posterior y configurar lo siguiente:

  • Habilita las preimágenes y las postimágenes para la colección en tu implementación de MongoDB.

    Tip

    Para saber cómo habilitarlos en su implementación, consulte la página del manual del servidor MongoDB Flujos de cambio con imágenes previas y posteriores al documento.

    Para aprender a indicar al driver que cree una colección con imágenes previas y posteriores habilitadas, consulte la sección Crear una colección con imágenes previas y posteriores habilitadas.

  • Configura tu flujo de cambios para recuperar imágenes previas, imágenes posteriores o ambas.

    Tip

    Para configurar tu flujo de cambios para que incluya la imagen previa, consulta el Ejemplo de configuración de imagen previa.

    Para configurar su flujo de cambios para incluir la imagen posterior, consulte el Ejemplo de configuración de imagen posterior.

Para crear una colección con la opción de preimagen y postimagen usando el driver, especifica una instancia de ChangeStreamPreAndPostImagesOptions y llama al método createCollection() como se muestra en el siguiente ejemplo:

val collectionOptions = CreateCollectionOptions()
collectionOptions.changeStreamPreAndPostImagesOptions(ChangeStreamPreAndPostImagesOptions(true))
database.createCollection("myChangeStreamCollection", collectionOptions)

Puede cambiar la opción de pre-imagen y post-imagen en una colección existente ejecutando el comando collMod desde el Shell de MongoDB. Para aprender cómo realizar esta operación, consulte la documentación del manual del servidor collMod.

Advertencia

Cuando modifica esta opción en una colección, cualquier flujo de cambio abierto en esa colección en su aplicación puede fallar si está configurado para requerir recibir la imagen previa o la imagen posterior.

El siguiente ejemplo de código muestra cómo puedes configurar un flujo de cambios para incluir la preimagen y mostrar los resultados:

val job = launch {
val changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

El ejemplo anterior configura el flujo de cambios para que use la opción FullDocumentBeforeChange.REQUIRED. Esto configura el flujo de cambios para que devuelva preimágenes para los eventos de cambio de reemplazo, actualización y borrado, y para que el servidor produzca un error si la preimagen no está disponible.

Supón que una aplicación actualizó el campo latestVersion de un documento en una colección de dependencias de la librería de software del valor de 2.0.0 a 2.1.0. La salida del evento de cambio correspondiente del ejemplo de código anterior debería parecerse al siguiente texto:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...}
namespace=software.libraries,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=6388..., latestVersion=2.0.0, ...}},
...

Para ver una lista de opciones, consulta la documentación de la API FullDocumentBeforeChange.

El siguiente ejemplo de código muestra cómo puedes configurar un flujo de cambios para incluir la imagen posterior y producir los resultados:

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
// Perform MongoDB operations that trigger change events...
// Cancel the change stream when you're done listening for events.
job.cancel()

El ejemplo anterior configura el flujo de cambios para que use la opción FullDocument.UPDATE_LOOKUP. Esto configura el flujo de cambios para devolver tanto las diferencias entre el documento original y el documento modificado como una copia del documento en algún momento después de que ocurrió el cambio.

Supón que una aplicación actualizó el campo population de un documento del valor 800 al 950 en una colección de datos censales de ciudades. El evento de cambio correspondiente generado por el ejemplo de código anterior debería parecerse al siguiente texto:

Received a change event: ChangeStreamDocument{
operationType=update,
resumeToken={...},
namespace=censusData.cities,
destinationNamespace=null,
fullDocument=Document{{_id=6388..., city=Springfield, population=950, ...}},
updatedFields={"population": 950}, ...
...

Para obtener una lista de opciones, consulte la documentación de la API FullDocument.

Volver

Monitoring

En esta página