Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Abrir Change Streams

En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad del MongoDB Server que permite a tu aplicación suscribirse a los cambios de datos en una única colección, base de datos o implementación.

Puedes abrir un flujo de cambios para ayudar a realizar las siguientes acciones:

  • Permitir que los dispositivos rastreen y reaccionen a eventos, como cámaras detectores de movimiento

  • Cree una aplicación que supervise los cambios en los precios de las acciones

  • Generar un registro de la actividad de los empleados para puestos de trabajo específicos

Puedes especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe tu aplicación. Cuando te conectes a una implementación de MongoDB v6.0 o posterior, también puedes configurar los eventos para que incluyan los datos del documento antes y después del cambio.

Para aprender a abrir y configurar un flujo de cambios, navega a las siguientes secciones:

  • Datos de muestra

  • Abre un flujo de cambios

  • Aplicar operadores de agregación al flujo de cambios

  • Incluir imágenes preoperativas y postoperatorias

  • Información Adicional

Los ejemplos de esta guía supervisan los cambios en el directors Colección. Supongamos que esta colección contiene los siguientes documentos, modelados como estructuras:

let docs = vec! [
Director {
name: "Todd Haynes".to_string(),
movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()],
oscar_noms: 1,
},
Director {
name: "Jane Campion".to_string(),
movies: vec! ["The Piano".to_string(), "Bright Star".to_string()],
oscar_noms: 5,
}
];

Tip

Para aprender a insertar documentos en una colección, consulte la Guía deinserción de documentos.

Puedes abrir un flujo de cambios para suscribirte a tipos específicos de cambios de datos y producir eventos de cambios en tu aplicación.

Para abrir un flujo de cambios, llame al método watch() en una instancia Collection, Database o Client.

Importante

Las implementaciones independientes de MongoDB no admiten flujos de cambios porque esta función requiere un registro de operaciones del conjunto de réplicas. Para obtener más información sobre el registro de operaciones, consulte la página Registro de operaciones del conjunto de réplicas en el manual del servidor.

El tipo de estructura sobre el que llames el watch() método determina el alcance de los eventos para los que el flujo de cambios escucha. La siguiente tabla describe el comportamiento del método watch() según el objeto que lo llama:

Tipo de struct
Comportamiento de watch()

Collection

Monitors changes to the individual collection

Database

Monitors changes to all collections in the database

Client

Monitors all changes in the connected MongoDB deployment

El siguiente ejemplo abre un flujo de cambios en la colección directors. El código imprime el tipo de operación y el documento modificado de cada evento de cambio accediendo a los campos operation_type y full_document de cada instancia ChangeStreamEvent:

let mut change_stream = my_coll.watch().await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Document: {:?}", event.full_document);
}

Tip

Para obtener una lista completa de los campos de la estructura ChangeStreamEvent, consulta ChangeStreamEvent en la documentación de la API.

Si inserta un documento en la colección directors en el que el valor name es "Wes Anderson", el código anterior produce el siguiente resultado:

Operation performed: Insert
Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })

Se puede encadenar el método pipeline() al método watch() para especificar qué eventos de cambio recibe el flujo de cambios. Pase un pipeline de agregación como un parámetro a pipeline().

Para aprender qué operadores de agregación admite tu versión de MongoDB Server, consulta Modificar salida de flujo de cambios en el manual del servidor.

El siguiente ejemplo crea una canalización de agregación para filtrar las operaciones de actualización. A continuación, el código pasa la canalización al método pipeline(), configurando el flujo de cambios para que solo reciba e imprima eventos de cambio para las operaciones de actualización:

let mut update_change_stream = my_coll.watch()
.pipeline(vec![doc! { "$match" : doc! { "operationType" : "update" } }])
.await?;
while let Some(event) = update_change_stream.next().await.transpose()? {
println!("Update performed: {:?}", event.update_description);
}

Si actualiza el documento en el que el valor name es "Todd Haynes" aumentando el valor del campo oscar_noms, el código anterior produce el siguiente resultado:

Update performed: Some(UpdateDescription { updated_fields: Document({
"oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })

Tip

Para aprender a realizar operaciones de actualización, consulta la guía Modificar documentos.

Puedes configurar el evento de cambio para que contenga u omita los siguientes datos:

  • Preimagen: un documento que representa la versión del documento antes de la operación, si existe

  • Post-imagen: un documento que representa la versión del documento después de la operación, si existe

Importante

Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.

Para recibir eventos de flujo de cambios que incluyan una preimagen o una postimagen, debes realizar las siguientes acciones:

  • Habilita las preimágenes y las postimágenes para la colección en tu implementación de MongoDB.

    Tip

    Para obtener instrucciones sobre cómo habilitar imágenes pre/post en la implementación, consulte Change Streams con imágenes pre/post en documentos en el manual del servidor.

    Para aprender a indicar al controlador que cree una colección con preimágenes y postimágenes habilitadas, consulta la sección Crea una colección con preimágenes y postimágenes habilitadas de esta página.

  • Configura tu flujo de cambios para recuperar o bien las preimágenes, las postimágenes o ambas. Durante esta configuración, se puede indicar al controlador que requiera imágenes previas y posteriores o incluirlas solo cuando estén disponibles.

    Tip

    Para configurar su flujo de cambios para registrar la imagen previa en eventos de cambio, consulte el Ejemplo de configuración de imagen previa en esta página.

    Para configurar tu flujo de cambios para grabar la posimagen en los eventos de cambio, consulta el Ejemplo de Configuración de la Posimagen en esta página.

Para habilitar documentos de preimagen y documento post-imagen para tu colección, utiliza el método de creación de opciones change_stream_pre_and_post_images(). El siguiente ejemplo utiliza este método constructor para especificar opciones de colección y crea una colección para la que están disponibles imágenes previas y posteriores:

let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build();
let result = my_db.create_collection("directors")
.change_stream_pre_and_post_images(enable)
.await?;

Puede cambiar las opciones de preimagen y postimagen en una colección existente ejecutando el collMod comando desde MongoDB Shell o desde su aplicación. Para saber cómo realizar esta operación, consulte la guía "Ejecutar un comando" y la entrada sobre collMod en el manual del servidor.

Advertencia

Si habilitaste preimágenes o postimágenes en una colección, modificar esta configuración con collMod puede provocar que los flujos de cambios existentes en esa colección se terminen.

Para configurar un flujo de cambios que devuelva eventos de cambios que contengan la preimagen, utiliza el método de configuración de opciones full_document_before_change(). El siguiente ejemplo especifica las opciones del flujo de cambios y crea un flujo de cambios que devuelve documentos pre-imagen:

let pre_image = FullDocumentBeforeChangeType::Required;
let mut change_stream = my_coll.watch()
.full_document_before_change(pre_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Pre-image: {:?}", event.full_document_before_change);
}

El ejemplo anterior pasa un valor de FullDocumentBeforeChangeType::Required al método del generador de opciones full_document_before_change(). Este método configura el flujo de cambios para exigir pre-imágenes en los eventos de cambio de reemplazo, actualizar y borrar. Si la pre-imagen no está disponible, el controlador genera un error.

Si actualizas un documento en el que el valor de name es "Jane Campion", el evento de cambio produce el siguiente resultado:

Operation performed: Update
Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano",
"Bright Star"], oscar_noms: 5 })

Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la imagen posterior, utiliza el método builder de la opción full_document(). El siguiente ejemplo especifica las opciones de flujo de cambios y crea un flujo de cambios que devuelve documentos post-imagen:

let post_image = FullDocumentType::WhenAvailable;
let mut change_stream = my_coll.watch()
.full_document(post_image)
.await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Post-image: {:?}", event.full_document);
}

El ejemplo anterior pasa un valor de FullDocument::WhenAvailable al método del generador de opciones full_document(). Este método configura el flujo de cambios para devolver post-imágenes para los eventos de cambios de reemplazo, actualización y eliminación si la post-imagen está disponible.

Si elimina el documento en el que el valor de name es "Todd Haynes", el evento de cambio produce la siguiente salida:

Operation performed: Delete
Post-image: None

Para obtener más información sobre cualquiera de los métodos o tipos mencionados en esta guía, consulta la siguiente documentación API:

Volver

Cursores de datos

En esta página