Overview
En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad del MongoDB Server que permite a tu aplicación suscribirse a los cambios de datos en una única colección, base de datos o implementación.
Puedes abrir un flujo de cambios para ayudar a realizar las siguientes acciones:
Permitir que los dispositivos rastreen y reaccionen a eventos, como cámaras detectores de movimiento
Cree una aplicación que supervise los cambios en los precios de las acciones
Generar un registro de la actividad de los empleados para puestos de trabajo específicos
Puedes especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe tu aplicación. Cuando te conectes a una implementación de MongoDB v6.0 o posterior, también puedes configurar los eventos para que incluyan los datos del documento antes y después del cambio.
Para aprender a abrir y configurar un flujo de cambios, navega a las siguientes secciones:
Datos de muestra
Los ejemplos de esta guía supervisan los cambios en el directors Colección. Supongamos que esta colección contiene los siguientes documentos, modelados como estructuras:
let docs = vec! [ Director { name: "Todd Haynes".to_string(), movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()], oscar_noms: 1, }, Director { name: "Jane Campion".to_string(), movies: vec! ["The Piano".to_string(), "Bright Star".to_string()], oscar_noms: 5, } ];
Tip
Para aprender a insertar documentos en una colección, consulte la Guía deinserción de documentos.
Abre un flujo de cambios
Puedes abrir un flujo de cambios para suscribirte a tipos específicos de cambios de datos y producir eventos de cambios en tu aplicación.
Para abrir un flujo de cambios, llame al método watch() en una instancia Collection, Database o Client.
Importante
Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página Registro de operaciones del conjunto de réplicas en el manual del servidor.
El tipo de estructura sobre el que llames el watch() método determina el alcance de los eventos para los que el flujo de cambios escucha. La siguiente tabla describe el comportamiento del método watch() según el objeto que lo llama:
Tipo de struct | Comportamiento de watch() |
|---|---|
| Monitors changes to the individual collection |
| Monitors changes to all collections in the database |
| Monitors all changes in the connected MongoDB deployment |
Ejemplo
El siguiente ejemplo abre un flujo de cambios en la colección directors. El código imprime el tipo de operación y el documento modificado de cada evento de cambio accediendo a los campos operation_type y full_document de cada instancia ChangeStreamEvent:
let mut change_stream = my_coll.watch().await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Document: {:?}", event.full_document); }
Tip
Para obtener una lista completa de los campos de la estructura ChangeStreamEvent, consulta ChangeStreamEvent en la documentación de la API.
Si inserta un documento en la colección directors en el que el valor name es "Wes Anderson", el código anterior produce el siguiente resultado:
Operation performed: Insert Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })
Aplicar operadores de agregación al flujo de cambios
Se puede encadenar el método pipeline() al método watch() para especificar qué eventos de cambio recibe el flujo de cambios. Pase un pipeline de agregación como un parámetro a pipeline().
Para aprender qué operadores de agregación admite tu versión de MongoDB Server, consulta Modificar salida de flujo de cambios en el manual del servidor.
Ejemplo
El siguiente ejemplo crea una canalización de agregación para filtrar las operaciones de actualización. A continuación, el código pasa la canalización al método pipeline(), configurando el flujo de cambios para que solo reciba e imprima eventos de cambio para las operaciones de actualización:
let mut update_change_stream = my_coll.watch() .pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }]) .await?; while let Some(event) = update_change_stream.next().await.transpose()? { println!("Update performed: {:?}", event.update_description); }
Si actualiza el documento en el que el valor name es "Todd Haynes" aumentando el valor del campo oscar_noms, el código anterior produce el siguiente resultado:
Update performed: Some(UpdateDescription { updated_fields: Document({ "oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })
Tip
Para aprender a realizar operaciones de actualización, consulta la guía Modificar documentos.
Incluir imágenes preoperativas y postoperatorias
Puedes configurar el evento de cambio para que contenga u omita los siguientes datos:
Preimagen: un documento que representa la versión del documento antes de la operación, si existe
Post-imagen: un documento que representa la versión del documento después de la operación, si existe
Importante
Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.
Para recibir eventos de flujo de cambios que incluyan una preimagen o una postimagen, debes realizar las siguientes acciones:
Habilita las preimágenes y las postimágenes para la colección en tu implementación de MongoDB.
Tip
Para obtener instrucciones sobre cómo habilitar imágenes pre/post en la implementación, consulte Change Streams con imágenes pre/post en documentos en el manual del servidor.
Para aprender a indicar al controlador que cree una colección con preimágenes y postimágenes habilitadas, consulta la sección Crea una colección con preimágenes y postimágenes habilitadas de esta página.
Configura tu flujo de cambios para recuperar o bien las preimágenes, las postimágenes o ambas. Durante esta configuración, se puede indicar al controlador que requiera imágenes previas y posteriores o incluirlas solo cuando estén disponibles.
Tip
Para configurar su flujo de cambios para registrar la imagen previa en eventos de cambio, consulte el Ejemplo de configuración de imagen previa en esta página.
Para configurar tu flujo de cambios para grabar la posimagen en los eventos de cambio, consulta el Ejemplo de Configuración de la Posimagen en esta página.
Crear una colección con imágenes previas y posteriores habilitadas
Para habilitar documentos de preimagen y documento post-imagen para tu colección, utiliza el método de creación de opciones change_stream_pre_and_post_images(). El siguiente ejemplo utiliza este método constructor para especificar opciones de colección y crea una colección para la que están disponibles imágenes previas y posteriores:
let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build(); let result = my_db.create_collection("directors") .change_stream_pre_and_post_images(enable) .await?;
Puede cambiar las opciones de preimagen y postimagen en una colección existente ejecutando el collMod comando desde MongoDB Shell o desde su aplicación. Para saber cómo realizar esta operación, consulte la guía "Ejecutar un comando" y la entrada sobre collMod en el manual del servidor.
Advertencia
Si habilitaste preimágenes o postimágenes en una colección, modificar esta configuración con collMod puede provocar que los flujos de cambios existentes en esa colección se terminen.
Ejemplo de Configuración Pre-Imagen
Para configurar un flujo de cambios que devuelva eventos de cambios que contengan la preimagen, utiliza el método de configuración de opciones full_document_before_change(). El siguiente ejemplo especifica las opciones del flujo de cambios y crea un flujo de cambios que devuelve documentos pre-imagen:
let pre_image = FullDocumentBeforeChangeType::Required; let mut change_stream = my_coll.watch() .full_document_before_change(pre_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Pre-image: {:?}", event.full_document_before_change); }
El ejemplo anterior pasa un valor de FullDocumentBeforeChangeType::Required al método del generador de opciones full_document_before_change(). Este método configura el flujo de cambios para exigir pre-imágenes en los eventos de cambio de reemplazo, actualizar y borrar. Si la pre-imagen no está disponible, el controlador genera un error.
Si actualizas un documento en el que el valor de name es "Jane Campion", el evento de cambio produce el siguiente resultado:
Operation performed: Update Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano", "Bright Star"], oscar_noms: 5 })
Ejemplo de Configuración Posterior a la Imagen
Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la imagen posterior, utiliza el método builder de la opción full_document(). El siguiente ejemplo especifica las opciones de flujo de cambios y crea un flujo de cambios que devuelve documentos post-imagen:
let post_image = FullDocumentType::WhenAvailable; let mut change_stream = my_coll.watch() .full_document(post_image) .await?; while let Some(event) = change_stream.next().await.transpose()? { println!("Operation performed: {:?}", event.operation_type); println!("Post-image: {:?}", event.full_document); }
El ejemplo anterior pasa un valor de FullDocument::WhenAvailable al método del generador de opciones full_document(). Este método configura el flujo de cambios para devolver post-imágenes para los eventos de cambios de reemplazo, actualización y eliminación si la post-imagen está disponible.
Si elimina el documento en el que el valor de name es "Todd Haynes", el evento de cambio produce la siguiente salida:
Operation performed: Delete Post-image: None
Información Adicional
Documentación de la API
Para obtener más información sobre cualquiera de los métodos o tipos mencionados en esta guía, consulta la siguiente documentación API: