Overview
En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en tiempo real en su base de datos. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Abre un flujo de cambios
Puede observar los cambios en MongoDB utilizando el watch() método en los siguientes objetos:
Para cada objeto, el watch() método abre un flujo de cambios para emitir documentos de eventos de cambio cuando ocurren.
El watch() método utiliza opcionalmente una canalización de agregación que consta de una matriz de etapas de agregación como primer parámetro. Las etapas de agregación filtran y transforman los eventos de cambio.
En el siguiente fragmento, la etapa $match coincide con todos los documentos de eventos de cambio con un valor runtime menor que 15 y filtra todos los demás.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
El método watch() acepta un objeto options como segundo parámetro. Consulta los enlaces al final de esta sección para obtener más información sobre la configuración que puedes realizar con este objeto.
El método watch() devuelve una instancia de un ChangeStream. Se pueden leer eventos de flujos de cambios iterando sobre ellos o escuchando eventos.
Advertencia
El controlador no admite el uso simultáneo de un ChangeStream en los modos EventEmitter y Iterator, lo que provoca un error. Esto evita un comportamiento indefinido, en el que el controlador no puede garantizar qué consumidor recibe primero los documentos.
Seleccione la pestaña que corresponde a la forma en que desea leer los eventos del flujo de cambios:
A partir de la versión 4.12, los objetos ChangeStream son iterables asíncronos. Con este cambio, se pueden usar bucles for-await para recuperar eventos de un flujo de cambios abierto:
for await (const change of changeStream) { console.log("Received change: ", change); }
Puedes llamar a métodos en el objeto ChangeStream tales como:
hasNext()Para comprobar si hay documentos restantes en la secuencianext()Para solicitar el siguiente documento en la secuenciaclose()para cerrar el ChangeStream
Puedes asociar funciones de escucha al objeto ChangeStream llamando al método on(). Este método se hereda de la clase EventEmitter de JavaScript. Pasa la cadena "change" como primer parámetro y tu función de escucha como segundo, como se muestra a continuación:
changeStream.on("change", (changeEvent) => { /* your listener function */ });
La función de escucha se activa cuando se emite un evento change. Puede especificar la lógica en la función de escucha para procesar el documento de evento de cambio al recibirlo.
Puede controlar el flujo de cambios llamando a pause() para dejar de emitir eventos o a resume() para continuar emitiendo eventos.
Para detener el procesamiento de eventos de cambio, llame al método close() en la ChangeStream instancia. Esto cierra el flujo de cambios y libera recursos.
changeStream.close();
Ejemplos
Iteración
Nota
Puede usar este ejemplo para conectarse a una instancia de MongoDB e interactuar con una base de datos que contiene datos de muestra. Para obtener más información sobre cómo conectarse a su instancia de MongoDB y cargar un conjunto de datos de muestra, consulte Comiencea utilizar la guía del controlador Node.js.
Nota
No hay características específicas de TypeScript
El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.
El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB e imprime los eventos de cambio a medida que ocurren:
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
Tip
Gestión explícita de recursos
El controlador de Nodo.js admite de forma nativa la gestión explícita de recursos para MongoClient, ClientSession, ChangeStreams y cursores. Esta característica es experimental y está sujeta a cambios. Para aprender a utilizar la gestión explícita de recursos, consulta las Notas de versión v6.9.
Cuando ejecuta este código y luego realiza un cambio en la colección haikus, como realizar una operación de inserción o eliminación, puede ver el documento del evento de cambio impreso en su terminal.
Por ejemplo, si inserta un documento en la colección, el código imprime la siguiente salida:
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
Nota
Recibir documentos completos de Actualizaciones
Los eventos de cambio que contienen información sobre operaciones de actualización solo devuelven los campos modificados por defecto, en lugar del documento actualizado completo. Puede configurar su flujo de cambios para que también devuelva la versión más reciente del documento estableciendo el campo fullDocument del objeto de opciones en "updateLookup", como se indica a continuación:
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
Función de oyente
El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB. Cree una función de escucha para recibir e imprimir los eventos de cambio que ocurren en la colección.
Primero, abra el flujo de cambios en la colección y luego defina un receptor en él mediante el método on(). Una vez configurado el receptor, genere un evento de cambio realizando un cambio en la colección.
Para generar el evento de cambio en la colección, utilizaremos el método insertOne() para añadir un nuevo documento. Dado que insertOne() puede ejecutarse antes de que la función oyente pueda registrarse, usamos un temporizador, definido como simulateAsyncPause, para esperar 1 segundo antes de ejecutar la inserción.
También usamos simulateAsyncPause después de insertar el documento. Esto proporciona tiempo suficiente para que la función de escucha reciba el evento de cambio y complete su ejecución antes de cerrar la instancia ChangeStream mediante el método close().
Nota
Razón para incluir temporizadores
Los temporizadores utilizados en este ejemplo son solo para fines demostrativos. Garantizan que haya tiempo suficiente para registrar el receptor y que este procese el evento de cambio antes de salir.
Nota
No hay características específicas de TypeScript
El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
Visite los siguientes recursos para obtener más material sobre las clases y métodos mencionados en esta página: