Overview
En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una sola colección, base de datos o implementación.
Puedes abrir un flujo de cambios para ayudar a realizar las siguientes acciones:
Permitir que los dispositivos rastreen y reaccionen a eventos, como cámaras con detección de movimiento.
Crear una aplicación que monitoree los cambios en los precios de las acciones
Generar un registro de la actividad de los empleados para puestos de trabajo específicos
Puede especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe su aplicación. Al conectarse a una implementación de MongoDB v6.0 o posterior, también puede configurar los eventos para que incluyan los datos del documento antes y después del cambio.
Para aprender a abrir y configurar un flujo de cambios, navegue a las siguientes secciones:
Datos de muestra
Los ejemplos de esta guía controlan los cambios en el directors Colección. Supongamos que esta colección contiene los siguientes documentos, modelados como estructuras:
let docs = vec! [ Director { name: "Todd Haynes".to_string(), movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()], oscar_noms: 1, }, Director { name: "Jane Campion".to_string(), movies: vec! ["The Piano".to_string(), "Bright Star".to_string()], oscar_noms: 5, } ];
Tip
Para aprender a insertar documentos en una colección, consulte la Guía deinserción de documentos.
Abre un flujo de cambios
Puede abrir un flujo de cambios para suscribirse a tipos específicos de cambios de datos y producir eventos de cambio en su aplicación.
Para abrir un flujo de cambios, llame al método watch() en una instancia Collection, Database o Client.
Importante
Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página Registro de operaciones del conjunto de réplicas en el manual del servidor.
El tipo de estructura en el que se llama al método watch() determina el alcance de los eventos que escucha el flujo de cambios. La siguiente tabla describe el comportamiento del método watch() según el objeto que lo llama:
Tipo de estructura | Comportamiento de watch() |
|---|---|
| Monitors changes to the individual collection |
| Monitors changes to all collections in the database |
| Monitors all changes in the connected MongoDB deployment |
Ejemplo
El siguiente ejemplo abre un flujo de cambios en la colección directors. El código imprime el tipo de operación y el documento modificado de cada evento de cambio accediendo a los campos operation_type y full_document de cada instancia ChangeStreamEvent:
let mut change_stream = my_coll.watch().await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Document: {:?}", event.full_document); }
Tip
Para obtener una lista completa de los ChangeStreamEvent campos de estructura, consulte ChangeStreamEvent en la documentación de la API.
Si inserta un documento en la colección directors en el que el valor name es "Wes Anderson", el código anterior produce el siguiente resultado:
Operation performed: Insert Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })
Aplicar operadores de agregación a su flujo de cambios
Puede encadenar el método pipeline() con el método watch() para especificar qué eventos de cambio recibe el flujo de cambios. Pase una canalización de agregación como parámetro a pipeline().
Para saber qué operadores de agregación admite su versión de MongoDB Server, consulte Modificar la salida del flujo de cambios en el manual del servidor.
Ejemplo
El siguiente ejemplo crea una canalización de agregación para filtrar las operaciones de actualización. A continuación, el código pasa la canalización al método pipeline(), configurando el flujo de cambios para que solo reciba e imprima eventos de cambio para las operaciones de actualización:
let mut update_change_stream = my_coll.watch() .pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }]) .await?; while let Some(event) = update_change_stream.next().await.transpose()? { println!("Update performed: {:?}", event.update_description); }
Si actualiza el documento en el que el valor name es "Todd Haynes" aumentando el valor del campo oscar_noms, el código anterior produce el siguiente resultado:
Update performed: Some(UpdateDescription { updated_fields: Document({ "oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })
Tip
Para aprender a realizar operaciones de actualización, consulta la guía Modificar documentos.
Incluir imágenes previas y posteriores
Puede configurar el evento de cambio para que contenga u omita los siguientes datos:
Preimagen: un documento que representa la versión del documento antes de la operación, si existe
Post-imagen: un documento que representa la versión del documento después de la operación, si existe
Importante
Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.
Para recibir eventos de flujo de cambios que incluyan una imagen previa o posterior, debe realizar las siguientes acciones:
Habilite imágenes previas y posteriores para la colección en su implementación de MongoDB.
Tip
Para saber cómo habilitar imágenes previas y posteriores en su implementación, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual del servidor.
Para saber cómo indicarle al controlador que cree una colección con imágenes previas y posteriores habilitadas, consulte la sección Crear una colección con imágenes previas y posteriores habilitadas de esta página.
Configure su flujo de cambios para recuperar las imágenes previas y posteriores, o ambas. Durante esta configuración, puede indicar al controlador que solicite imágenes previas y posteriores o que las incluya solo cuando estén disponibles.
Tip
Para configurar su flujo de cambios para registrar la imagen previa en eventos de cambio, consulte el Ejemplo de configuración de imagen previa en esta página.
Para configurar su flujo de cambios para registrar la imagen posterior en eventos de cambio, consulte el Ejemplo de configuración de imagen posterior en esta página.
Crear una colección con imágenes previas y posteriores habilitadas
Para habilitar documentos de preimagen y postimagen en su colección, utilice el método de creación de opciones change_stream_pre_and_post_images(). El siguiente ejemplo utiliza este método para especificar las opciones de colección y crea una colección con preimagen y postimagen disponibles:
let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build(); let result = my_db.create_collection("directors") .change_stream_pre_and_post_images(enable) .await?;
Puede cambiar las opciones de preimagen y postimagen en una colección existente ejecutando el collMod comando desde MongoDB Shell o desde su aplicación. Para saber cómo realizar esta operación, consulte la guía "Ejecutar un comando" y la entrada sobre collMod en el manual del servidor.
Advertencia
Si habilitó imágenes previas o posteriores en una colección, modificar estas configuraciones con collMod puede provocar que finalicen los flujos de cambios existentes en esa colección.
Ejemplo de configuración de preimagen
Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la preimagen, utilice el método de creación de opciones full_document_before_change(). El siguiente ejemplo especifica las opciones del flujo de cambios y crea un flujo de cambios que devuelve documentos de preimagen:
let pre_image = FullDocumentBeforeChangeType::Required; let mut change_stream = my_coll.watch() .full_document_before_change(pre_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Pre-image: {:?}", event.full_document_before_change); }
El ejemplo anterior pasa el valor FullDocumentBeforeChangeType::Required al método del generador de opciones full_document_before_change(). Este método configura el flujo de cambios para requerir imágenes previas para los eventos de reemplazo, actualización y eliminación. Si la imagen previa no está disponible, el controlador genera un error.
Si actualiza un documento en el que el valor name es "Jane Campion", el evento de cambio produce el siguiente resultado:
Operation performed: Update Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano", "Bright Star"], oscar_noms: 5 })
Ejemplo de configuración posterior a la imagen
Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la imagen posterior, utiliza el método builder de la opción full_document(). El siguiente ejemplo especifica las opciones de flujo de cambios y crea un flujo de cambios que devuelve documentos post-imagen:
let post_image = FullDocumentType::WhenAvailable; let mut change_stream = my_coll.watch() .full_document(post_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Post-image: {:?}", event.full_document); }
El ejemplo anterior pasa el valor FullDocument::WhenAvailable al método del generador de opciones full_document(). Este método configura el flujo de cambios para devolver imágenes posteriores para los eventos de reemplazo, actualización y eliminación, si la imagen posterior está disponible.
Si elimina el documento en el que el valor de name es "Todd Haynes", el evento de cambio produce la siguiente salida:
Operation performed: Delete Post-image: None
Información Adicional
Documentación de la API
Para obtener más información sobre cualquiera de los métodos o tipos mencionados en esta guía, consulte la siguiente documentación de API: