Overview
En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar los cambios en tiempo real de tu base de datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para aprender más información sobre esta funcionalidad, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Abre un flujo de cambios
Puedes estar atento a los cambios en MongoDB usando el watch() método en los siguientes objetos:
Para cada objeto, el método watch() abre un flujo de cambios para emitir documentos de evento de cambio cuando estos ocurran.
El método watch() opcionalmente toma un pipeline de agregación que consiste en un arreglo de etapas de agregación como primer parámetro. Las etapas de agregación filtran y transforman los eventos de cambio.
En el siguiente snippet, la etapa $match coincide con todos los documentos de eventos de cambio que tienen un valor runtime menor que 15, filtrando todos los demás.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
El método watch() acepta un objeto options como segundo parámetro. Consulta los enlaces al final de esta sección para obtener más información sobre la configuración que puedes realizar con este objeto.
El método watch() devuelve una instancia de un ChangeStream. Se pueden leer eventos de flujos de cambios iterando sobre ellos o escuchando eventos.
Advertencia
El controlador no admite el uso simultáneo de un ChangeStream en los modos EventEmitter y Iterator, lo que provoca un error. Esto evita un comportamiento indefinido, en el que el controlador no puede garantizar qué consumidor recibe primero los documentos.
Seleccione la pestaña que corresponde a la forma en que desea leer los eventos del flujo de cambios:
A partir de la versión 4.12, los objetos ChangeStream son iterables asíncronos. Con este cambio, se pueden usar bucles for-await para recuperar eventos de un flujo de cambios abierto:
for await (const change of changeStream) { console.log("Received change: ", change); }
Puedes llamar a métodos en el objeto ChangeStream tales como:
hasNext()Para comprobar si hay documentos restantes en la secuencianext()solicitar el siguiente documento en la secuenciaclose()para cerrar el ChangeStream
Se pueden adjuntar funciones de escucha al objeto ChangeStream llamando al método on(). Este método se hereda de la clase EventEmitter de JavaScript. Pasa la string "change" como el primer parámetro y tu función de escucha como el segundo parámetro, como se muestra a continuación:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
La función del listener se activa cuando se emite un evento change. Puedes especificar una lógica en el listener para procesar el documento de evento de cambio cuando lo recibas.
Puedes controlar el flujo de cambios llamando a pause() para dejar de emitir eventos o a resume() para continuar emitiendo eventos.
Para detener el procesamiento de eventos de cambio, llame al método close() en la ChangeStream instancia. Esto cierra el flujo de cambios y libera recursos.
changeStream.close();
Ejemplos
Iteración
Nota
Puede utilizar este ejemplo para conectarse a una instancia de MongoDB e interactuar con una base de datos que contiene datos de muestra. Para obtener más información sobre cómo conectarte a tu instancia de MongoDB y cargar un conjunto de datos de muestra, consulta Comiencea utilizar la guía del controlador Node.js.
Nota
No hay características específicas de TypeScript
El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.
El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB y muestra los eventos de cambio a medida que ocurren:
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
Tip
Gestión explícita de recursos
El controlador de Nodo.js admite de forma nativa la gestión explícita de recursos para MongoClient, ClientSession, ChangeStreams y cursores. Esta característica es experimental y está sujeta a cambios. Para aprender a utilizar la gestión explícita de recursos, consulta las Notas de versión v6.9.
Cuando ejecutes este código y luego realices un cambio en la colección haikus, como realizar una operación de inserción o eliminación, podrás ver el documento del evento de cambio impreso en tu terminal.
Por ejemplo, si se inserta un documento en la colección, el código imprime el siguiente resultado:
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
Nota
Recibir Documentos Completos de Actualizaciones
Los eventos de cambio que contienen información sobre las operaciones de actualización solo devuelven por defecto los campos modificados en lugar de todo el documento actualizado. Puedes configurar tu flujo de cambios para que también devuelva la versión más actual del documento estableciendo el campo fullDocument del objeto de opciones en "updateLookup" de la siguiente manera:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
Función de escucha
El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB. Creemos una función de escucha para recibir e imprimir los eventos de cambio que ocurren en la colección.
Primero, abre el flujo de cambios en la colección y luego define un listener en el flujo de cambios utilizando el método on(). Una vez que configures el escucha, genera un evento de cambio realizando un cambio en la colección.
Para generar el evento de cambio en la colección, utilizaremos el método insertOne() para añadir un nuevo documento. Dado que insertOne() puede ejecutarse antes de que la función oyente pueda registrarse, usamos un temporizador, definido como simulateAsyncPause, para esperar 1 segundo antes de ejecutar la inserción.
También usamos simulateAsyncPause después de la inserción del documento. Esto proporciona tiempo suficiente para que la función de escucha reciba el evento de cambio y para que el escuchador complete su ejecución antes de cerrar la instancia ChangeStream utilizando el método close().
Nota
Razón para incluir temporizadores
Los temporizadores utilizados en este ejemplo son solo para fines demostrativos. Garantizan que haya tiempo suficiente para registrar el receptor y que este procese el evento de cambio antes de salir.
Nota
No hay características específicas de TypeScript
El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
Visite los siguientes recursos para obtener más material sobre las clases y métodos mencionados en esta página: