Docs Menu
Docs Home
/ /

Flujos de cambio abiertos

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una sola colección, base de datos o implementación.

Puedes abrir un flujo de cambios para ayudar a realizar las siguientes acciones:

  • Permitir que los dispositivos rastreen y reaccionen a eventos, como cámaras con detección de movimiento.

  • Crear una aplicación que monitoree los cambios en los precios de las acciones

  • Generar un registro de la actividad de los empleados para puestos de trabajo específicos

Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a una implementación de MongoDB v6.0 o posterior, también puede configurar los eventos para que incluyan los datos del documento antes y después del cambio.

Para aprender a abrir y configurar un flujo de cambios, navegue a las siguientes secciones:

  • Datos de muestra

  • Abre un flujo de cambios

  • Aplicar operadores de agregación a su flujo de cambios

  • Incluir imágenes previas y posteriores

  • Información Adicional

Los ejemplos de esta guía controlan los cambios en el directors Colección. Supongamos que esta colección contiene los siguientes documentos, modelados como estructuras:

let docs = vec! [
Director {
name: "Todd Haynes".to_string(),
movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()],
oscar_noms: 1,
},
Director {
name: "Jane Campion".to_string(),
movies: vec! ["The Piano".to_string(), "Bright Star".to_string()],
oscar_noms: 5,
}
];

Tip

Para aprender a insertar documentos en una colección, consulte la Guía deinserción de documentos.

Puede abrir un flujo de cambios para suscribirse a tipos específicos de cambios de datos y producir eventos de cambio en su aplicación.

Para abrir un flujo de cambios, llame al método watch() en una instancia Collection, Database o Client.

Importante

Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página Registro de operaciones del conjunto de réplicas en el manual del servidor.

El tipo de estructura en el que se llama al método watch() determina el alcance de los eventos que escucha el flujo de cambios. La siguiente tabla describe el comportamiento del método watch() según el objeto que lo llama:

Tipo de estructura
Comportamiento de watch()

Collection

Monitors changes to the individual collection

Database

Monitors changes to all collections in the database

Client

Monitors all changes in the connected MongoDB deployment

El siguiente ejemplo abre un flujo de cambios en la colección directors. El código imprime el tipo de operación y el documento modificado de cada evento de cambio accediendo a los campos operation_type y full_document de cada instancia ChangeStreamEvent:

let mut change_stream = my_coll.watch().await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Document: {:?}", event.full_document);
}

Tip

Para obtener una lista completa de los ChangeStreamEvent campos de estructura, consulte ChangeStreamEvent en la documentación de la API.

Si inserta un documento en la colección directors en el que el valor name es "Wes Anderson", el código anterior produce el siguiente resultado:

Operation performed: Insert
Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })

Puede encadenar el método pipeline() con el método watch() para especificar qué eventos de cambio recibe el flujo de cambios. Pase una canalización de agregación como parámetro a pipeline().

Para saber qué operadores de agregación admite su versión de MongoDB Server, consulte Modificar la salida del flujo de cambios en el manual del servidor.

El siguiente ejemplo crea una canalización de agregación para filtrar las operaciones de actualización. A continuación, el código pasa la canalización al método pipeline(), configurando el flujo de cambios para que solo reciba e imprima eventos de cambio para las operaciones de actualización:

let mut update_change_stream = my_coll.watch()
.pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }])
.await?;
while let Some(event) = update_change_stream.next().await.transpose()? {
println!("Update performed: {:?}", event.update_description);
}

Si actualiza el documento en el que el valor name es "Todd Haynes" aumentando el valor del campo oscar_noms, el código anterior produce el siguiente resultado:

Update performed: Some(UpdateDescription { updated_fields: Document({
"oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })

Tip

Para aprender a realizar operaciones de actualización, consulta la guía Modificar documentos.

Puede configurar el evento de cambio para que contenga u omita los siguientes datos:

  • Preimagen: un documento que representa la versión del documento antes de la operación, si existe

  • Post-imagen: un documento que representa la versión del documento después de la operación, si existe

Importante

Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.

Para recibir eventos de flujo de cambios que incluyan una imagen previa o posterior, debe realizar las siguientes acciones:

  • Habilite imágenes previas y posteriores para la colección en su implementación de MongoDB.

    Tip

    Para saber cómo habilitar imágenes previas y posteriores en su implementación, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual del servidor.

    Para saber cómo indicarle al controlador que cree una colección con imágenes previas y posteriores habilitadas, consulte la sección Crear una colección con imágenes previas y posteriores habilitadas de esta página.

  • Configure su flujo de cambios para recuperar las imágenes previas y posteriores, o ambas. Durante esta configuración, puede indicar al controlador que solicite imágenes previas y posteriores o que las incluya solo cuando estén disponibles.

    Tip

    Para configurar su flujo de cambios para registrar la imagen previa en eventos de cambio, consulte el Ejemplo de configuración de imagen previa en esta página.

    Para configurar su flujo de cambios para registrar la imagen posterior en eventos de cambio, consulte el Ejemplo de configuración de imagen posterior en esta página.

Para habilitar documentos de preimagen y postimagen en su colección, utilice el método de creación de opciones change_stream_pre_and_post_images(). El siguiente ejemplo utiliza este método para especificar las opciones de colección y crea una colección con preimagen y postimagen disponibles:

let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build();
let result = my_db.create_collection("directors")
.change_stream_pre_and_post_images(enable)
.await?;

Puede cambiar las opciones de preimagen y postimagen en una colección existente ejecutando el collMod comando desde MongoDB Shell o desde su aplicación. Para saber cómo realizar esta operación, consulte la guía "Ejecutar un comando" y la entrada sobre collMod en el manual del servidor.

Advertencia

Si habilitó imágenes previas o posteriores en una colección, modificar estas configuraciones con collMod puede provocar que finalicen los flujos de cambios existentes en esa colección.

Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la preimagen, utilice el método de creación de opciones full_document_before_change(). El siguiente ejemplo especifica las opciones del flujo de cambios y crea un flujo de cambios que devuelve documentos de preimagen:

let pre_image = FullDocumentBeforeChangeType::Required;
let mut change_stream = my_coll.watch()
.full_document_before_change(pre_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Pre-image: {:?}", event.full_document_before_change);
}

El ejemplo anterior pasa el valor FullDocumentBeforeChangeType::Required al método del generador de opciones full_document_before_change(). Este método configura el flujo de cambios para requerir imágenes previas para los eventos de reemplazo, actualización y eliminación. Si la imagen previa no está disponible, el controlador genera un error.

Si actualiza un documento en el que el valor name es "Jane Campion", el evento de cambio produce el siguiente resultado:

Operation performed: Update
Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano",
"Bright Star"], oscar_noms: 5 })

Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la imagen posterior, utiliza el método builder de la opción full_document(). El siguiente ejemplo especifica las opciones de flujo de cambios y crea un flujo de cambios que devuelve documentos post-imagen:

let post_image = FullDocumentType::WhenAvailable;
let mut change_stream = my_coll.watch()
.full_document(post_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Post-image: {:?}", event.full_document);
}

El ejemplo anterior pasa el valor FullDocument::WhenAvailable al método del generador de opciones full_document(). Este método configura el flujo de cambios para devolver imágenes posteriores para los eventos de reemplazo, actualización y eliminación, si la imagen posterior está disponible.

Si elimina el documento en el que el valor de name es "Todd Haynes", el evento de cambio produce la siguiente salida:

Operation performed: Delete
Post-image: None

Para obtener más información sobre cualquiera de los métodos o tipos mencionados en esta guía, consulte la siguiente documentación de API:

Volver

Cursores de datos

En esta página