Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Abrir Change Streams

En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad del MongoDB Server que permite a tu aplicación suscribirse a los cambios de datos en una única colección, base de datos o implementación.

Puedes abrir un flujo de cambios para ayudar a realizar las siguientes acciones:

  • Permitir que los dispositivos rastreen y reaccionen a eventos, como cámaras detectores de movimiento

  • Cree una aplicación que supervise los cambios en los precios de las acciones

  • Generar un registro de la actividad de los empleados para puestos de trabajo específicos

Puedes especificar un conjunto de operadores de agregación para filtrar y transformar los datos que recibe tu aplicación. Cuando te conectes a una implementación de MongoDB v6.0 o posterior, también puedes configurar los eventos para que incluyan los datos del documento antes y después del cambio.

Para aprender a abrir y configurar un flujo de cambios, navega a las siguientes secciones:

  • Datos de muestra

  • Abre un flujo de cambios

  • Aplicar operadores de agregación al flujo de cambios

  • Incluir imágenes preoperativas y postoperatorias

  • Información Adicional

Los ejemplos de esta guía supervisan los cambios en el directors colección. Suponga que esta colección contiene los siguientes documentos, que están modelados como estructuras:

let docs = vec! [
Director {
name: "Todd Haynes".to_string(),
movies: vec! ["Far From Heaven".to_string(), "Carol".to_string()],
oscar_noms: 1,
},
Director {
name: "Jane Campion".to_string(),
movies: vec! ["The Piano".to_string(), "Bright Star".to_string()],
oscar_noms: 5,
}
];

Tip

Para aprender a insertar documentos en una colección, consulte la Guía de Insertar documentos.

Puedes abrir un flujo de cambios para suscribirte a tipos específicos de cambios de datos y producir eventos de cambios en tu aplicación.

Para abrir un flujo de cambios, llama al método watch() en una instancia de Collection, Database o Client.

Importante

Las implementaciones autónomas de MongoDB no admiten flujos de cambios porque la funcionalidad requiere un oplog de set de réplicas. Para obtener más información sobre el oplog, consulta la página Oplog de Sets de réplicas en el manual del servidor.

El tipo de estructura sobre el que llames el watch() método determina el alcance de los eventos para los que el flujo de cambios escucha. La siguiente tabla describe el comportamiento del método watch() según el objeto que lo llama:

Tipo de struct
Comportamiento de watch()

Collection

Monitors changes to the individual collection

Database

Monitors changes to all collections in the database

Client

Monitors all changes in the connected MongoDB deployment

El siguiente ejemplo abre un flujo de cambios en la colección directors. El código imprime el tipo de operación y el documento modificado de cada evento de cambio accediendo a los campos operation_type y full_document de cada instancia ChangeStreamEvent:

let mut change_stream = my_coll.watch(None, None).await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Document: {:?}", event.full_document);
}

Tip

Para obtener una lista completa de los campos de la estructura ChangeStreamEvent, consulta ChangeStreamEvent en la documentación de la API.

Si inserta un documento en la colección directors en el que el valor name es "Wes Anderson", el código anterior produce el siguiente resultado:

Operation performed: Insert
Document: Some(Director { name: "Wes Anderson", movies: [...], oscar_noms: 7 })

Puedes pasar una pipeline de agregación como parámetro al método watch() para especificar qué eventos de cambio recibe el flujo de cambios.

Para aprender qué operadores de agregación admite tu versión de MongoDB Server, consulta Modificar salida de flujo de cambios en el manual del servidor.

El siguiente ejemplo crea una pipeline de agregación para filtrar operaciones de actualización. Luego, el código pasa el pipeline al método watch(), configurando el flujo de cambios para que solo reciba e imprima eventos de cambios para operaciones de actualización:

let pipeline = vec![
doc! { "$match" : doc! { "operationType" : "update" } }
];
let mut update_change_stream = my_coll.watch(pipeline, None).await?;
while let Some(event) = update_change_stream.next().await.transpose()? {
println!("Update performed: {:?}", event.update_description);
}

Si actualizas el documento en el que el valor name es "Todd Haynes" aumentando el valor del campo oscar_noms, el siguiente código produce la siguiente salida:

Update performed: Some(UpdateDescription { updated_fields: Document({
"oscar_noms": Int64(2)}), removed_fields: [], truncated_arrays: Some([]) })

Tip

Para aprender a realizar operaciones de actualización, consulta la guía Modificar documentos.

Puedes configurar el evento de cambio para que contenga u omita los siguientes datos:

  • Preimagen: un documento que representa la versión del documento antes de la operación, si existe

  • Post-imagen: un documento que representa la versión del documento después de la operación, si existe

Importante

Solo puedes habilitar preimágenes y postimágenes en colecciones si tu implementación utiliza MongoDB v6.0 o posterior.

Para recibir eventos de flujo de cambios que incluyan una preimagen o una postimagen, debes realizar las siguientes acciones:

  • Habilita las preimágenes y las postimágenes para la colección en tu implementación de MongoDB.

    Tip

    Para obtener instrucciones sobre cómo habilitar imágenes pre/post en la implementación, consulte Change Streams con imágenes pre/post en documentos en el manual del servidor.

    Para aprender a indicar al controlador que cree una colección con preimágenes y postimágenes habilitadas, consulta la sección Crea una colección con preimágenes y postimágenes habilitadas de esta página.

  • Configura tu flujo de cambios para recuperar o bien las preimágenes, las postimágenes o ambas. Durante esta configuración, se puede indicar al controlador que requiera imágenes previas y posteriores o incluirlas solo cuando estén disponibles.

    Tip

    Para configurar tu flujo de cambio para registrar la imagen previa en los eventos de cambio, consulta el Ejemplo de configuración de imagen previa en esta página.

    Para configurar tu flujo de cambios para grabar la posimagen en los eventos de cambio, consulta el Ejemplo de Configuración de la Posimagen en esta página.

Para crear una colección con los documentos de preimagen y documentos post-imagen habilitados, configure esta opción en una instancia de estructura CreateCollectionOptions. Para empezar a crear una instancia CreateCollectionOptions, llama al método CreateCollectionOptions::builder().

Nota

Opciones de instanciación

El controlador Rust implementa el patrón de diseño Builder para la creación de muchos tipos diferentes, incluidos CreateCollectionOptions. Se puede usar el método builder() para construir una instancia de cada tipo encadenando métodos del generador de opciones.

Al configurar tu instancia de CreateCollectionOptions, debes utilizar el método builder change_stream_pre_and_post_images(). El siguiente ejemplo utiliza este método constructor para especificar opciones de colección y crea una colección para la que están disponibles imágenes previas y posteriores:

let enable = ChangeStreamPreAndPostImages::builder().enabled(true).build();
let opts = CreateCollectionOptions::builder()
.change_stream_pre_and_post_images(enable)
.build();
let result = my_db.create_collection("directors", opts).await?;

Puedes cambiar la opción de preimagen y postimagen en una colección existente ejecutando el comando collMod desde MongoDB Shell o desde tu aplicación. Para aprender cómo realizar esta operación, consulta la guía Ejecutar un comando y la entrada sobre collMod en el manual del servidor.

Advertencia

Si habilitaste preimágenes o postimágenes en una colección, modificar esta configuración con collMod puede provocar que los flujos de cambios existentes en esa colección se terminen.

Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la preimagen, especifica una instancia de ChangeStreamOptions. Para comenzar a crear una instancia de ChangeStreamOptions, llama al método ChangeStreamOptions::builder().

Al configurar tu instancia ChangeStreamOptions para que devuelva el documento pre-imagen, debes usar el método constructor full_document_before_change(). El siguiente ejemplo especifica las opciones del flujo de cambios y crea un flujo de cambios que devuelve documentos pre-imagen:

let pre_image = Some(FullDocumentBeforeChangeType::Required);
let opts = ChangeStreamOptions::builder()
.full_document_before_change(pre_image)
.build();
let mut change_stream = my_coll.watch(None, opts).await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Pre-image: {:?}", event.full_document_before_change);
}

El ejemplo anterior pasa un valor de FullDocumentBeforeChangeType::Required al método constructor full_document_before_change(). Esta opción configura el flujo de cambios para que requiera preimágenes para los eventos de cambio de reemplazo, actualización y eliminación. Si la pre-imagen no está disponible, el controlador genera un error.

Si actualizas un documento en el que el valor de name es "Jane Campion", el evento de cambio produce el siguiente resultado:

Operation performed: Update
Pre-image: Some(Director { name: "Jane Campion", movies: ["The Piano",
"Bright Star"], oscar_noms: 5 })

Para configurar un flujo de cambios que devuelva eventos de cambio que contengan la postimagen, especifica una instancia de la estructura ChangeStreamOptions. Para empezar a construir una instancia de ChangeStreamOptions, llame al método ChangeStreamOptions::builder().

Cuando configures tu instancia de ChangeStreamOptions para devolver el documento post-imagen, debes usar el método full_document() builder. El siguiente ejemplo especifica las opciones de flujo de cambios y crea un flujo de cambios que devuelve documentos post-imagen:

let post_image = Some(FullDocumentType::WhenAvailable);
let opts = ChangeStreamOptions::builder()
.full_document(post_image)
.build();
let mut change_stream = my_coll.watch(None, opts).await?;
while let Some(event) = change_stream.next().await.transpose()? {
println!("Operation performed: {:?}", event.operation_type);
println!("Post-image: {:?}", event.full_document);
}

El ejemplo anterior pasa un valor de FullDocument::WhenAvailable al método constructor full_document(). Esta opción configura el flujo de cambios para que devuelva posimágenes para eventos de cambio de reemplazo, actualización y eliminación si la posimagen está disponible.

Si elimina el documento en el que el valor de name es "Todd Haynes", el evento de cambio produce la siguiente salida:

Operation performed: Delete
Post-image: None

Para obtener más información sobre cualquiera de los métodos o tipos mencionados en esta guía, consulta la siguiente documentación API:

Volver

Cursores de datos

En esta página