Docs Menu
Docs Home
/ /

Monitorear datos con flujos de cambios

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en tiempo real en su base de datos. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.

Puede observar los cambios en MongoDB utilizando el watch() método en los siguientes objetos:

Para cada objeto, el watch() método abre un flujo de cambios para emitir documentos de eventos de cambio cuando ocurren.

El watch() método utiliza opcionalmente una canalización de agregación que consta de una matriz de etapas de agregación como primer parámetro. Las etapas de agregación filtran y transforman los eventos de cambio.

En el siguiente fragmento, la etapa $match coincide con todos los documentos de eventos de cambio con un valor runtime menor que 15 y filtra todos los demás.

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

El método watch() acepta un objeto options como segundo parámetro. Consulta los enlaces al final de esta sección para obtener más información sobre la configuración que puedes realizar con este objeto.

El método watch() devuelve una instancia de un ChangeStream. Se pueden leer eventos de flujos de cambios iterando sobre ellos o escuchando eventos.

Advertencia

El controlador no admite el uso simultáneo de un ChangeStream en los modos EventEmitter y Iterator, lo que provoca un error. Esto evita un comportamiento indefinido, en el que el controlador no puede garantizar qué consumidor recibe primero los documentos.

Seleccione la pestaña que corresponde a la forma en que desea leer los eventos del flujo de cambios:

A partir de la versión 4.12, los objetos ChangeStream son iterables asíncronos. Con este cambio, se pueden usar bucles for-await para recuperar eventos de un flujo de cambios abierto:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

Puedes llamar a métodos en el objeto ChangeStream tales como:

  • hasNext() Para comprobar si hay documentos restantes en la secuencia

  • next() Para solicitar el siguiente documento en la secuencia

  • close() para cerrar el ChangeStream

Puedes asociar funciones de escucha al objeto ChangeStream llamando al método on(). Este método se hereda de la clase EventEmitter de JavaScript. Pasa la cadena "change" como primer parámetro y tu función de escucha como segundo, como se muestra a continuación:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

La función de escucha se activa cuando se emite un evento change. Puede especificar la lógica en la función de escucha para procesar el documento de evento de cambio al recibirlo.

Puede controlar el flujo de cambios llamando a pause() para dejar de emitir eventos o a resume() para continuar emitiendo eventos.

Para detener el procesamiento de eventos de cambio, llame al método close() en la ChangeStream instancia. Esto cierra el flujo de cambios y libera recursos.

changeStream.close();

Nota

Puede usar este ejemplo para conectarse a una instancia de MongoDB e interactuar con una base de datos que contiene datos de muestra. Para obtener más información sobre cómo conectarse a su instancia de MongoDB y cargar un conjunto de datos de muestra, consulte Comiencea utilizar la guía del controlador Node.js.

Nota

No hay características específicas de TypeScript

El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.

El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB e imprime los eventos de cambio a medida que ocurren:

1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);

Tip

Gestión explícita de recursos

El controlador de Nodo.js admite de forma nativa la gestión explícita de recursos para MongoClient, ClientSession, ChangeStreams y cursores. Esta característica es experimental y está sujeta a cambios. Para aprender a utilizar la gestión explícita de recursos, consulta las Notas de versión v6.9.

Cuando ejecuta este código y luego realiza un cambio en la colección haikus, como realizar una operación de inserción o eliminación, puede ver el documento del evento de cambio impreso en su terminal.

Por ejemplo, si inserta un documento en la colección, el código imprime la siguiente salida:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

Nota

Recibir documentos completos de Actualizaciones

Los eventos de cambio que contienen información sobre operaciones de actualización solo devuelven los campos modificados por defecto, en lugar del documento actualizado completo. Puede configurar su flujo de cambios para que también devuelva la versión más reciente del documento estableciendo el campo fullDocument del objeto de opciones en "updateLookup", como se indica a continuación:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB. Cree una función de escucha para recibir e imprimir los eventos de cambio que ocurren en la colección.

Primero, abra el flujo de cambios en la colección y luego defina un receptor en él mediante el método on(). Una vez configurado el receptor, genere un evento de cambio realizando un cambio en la colección.

Para generar el evento de cambio en la colección, utilizaremos el método insertOne() para añadir un nuevo documento. Dado que insertOne() puede ejecutarse antes de que la función oyente pueda registrarse, usamos un temporizador, definido como simulateAsyncPause, para esperar 1 segundo antes de ejecutar la inserción.

También usamos simulateAsyncPause después de insertar el documento. Esto proporciona tiempo suficiente para que la función de escucha reciba el evento de cambio y complete su ejecución antes de cerrar la instancia ChangeStream mediante el método close().

Nota

Razón para incluir temporizadores

Los temporizadores utilizados en este ejemplo son solo para fines demostrativos. Garantizan que haya tiempo suficiente para registrar el receptor y que este procese el evento de cambio antes de salir.

Nota

No hay características específicas de TypeScript

El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.

1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);

Visite los siguientes recursos para obtener más material sobre las clases y métodos mencionados en esta página:

Volver

Registro

En esta página