Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Supervise los datos con Change Streams

En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar los cambios en tiempo real de tu base de datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para aprender más información sobre esta funcionalidad, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.

Puedes estar atento a los cambios en MongoDB usando el watch() método en los siguientes objetos:

Para cada objeto, el método watch() abre un flujo de cambios para emitir documentos de evento de cambio cuando estos ocurran.

El método watch() opcionalmente toma un pipeline de agregación que consiste en un arreglo de etapas de agregación como primer parámetro. Las etapas de agregación filtran y transforman los eventos de cambio.

En el siguiente snippet, la etapa $match coincide con todos los documentos de eventos de cambio que tienen un valor runtime menor que 15, filtrando todos los demás.

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

El método watch() acepta un objeto options como segundo parámetro. Consulta los enlaces al final de esta sección para obtener más información sobre la configuración que puedes realizar con este objeto.

El método watch() devuelve una instancia de un ChangeStream. Se pueden leer eventos de flujos de cambios iterando sobre ellos o escuchando eventos.

Advertencia

El controlador no admite el uso simultáneo de un ChangeStream en los modos EventEmitter y Iterator, lo que provoca un error. Esto evita un comportamiento indefinido, en el que el controlador no puede garantizar qué consumidor recibe primero los documentos.

Seleccione la pestaña que corresponde a la forma en que desea leer los eventos del flujo de cambios:

A partir de la versión 4.12, los objetos ChangeStream son iterables asíncronos. Con este cambio, se pueden usar bucles for-await para recuperar eventos de un flujo de cambios abierto:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

Puedes llamar a métodos en el objeto ChangeStream tales como:

  • hasNext() Para comprobar si hay documentos restantes en la secuencia

  • next() solicitar el siguiente documento en la secuencia

  • close() para cerrar el ChangeStream

Se pueden adjuntar funciones de escucha al objeto ChangeStream llamando al método on(). Este método se hereda de la clase EventEmitter de JavaScript. Pasa la string "change" como el primer parámetro y tu función de escucha como el segundo parámetro, como se muestra a continuación:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

La función del listener se activa cuando se emite un evento change. Puedes especificar una lógica en el listener para procesar el documento de evento de cambio cuando lo recibas.

Puedes controlar el flujo de cambios llamando a pause() para dejar de emitir eventos o a resume() para continuar emitiendo eventos.

Para detener el procesamiento de eventos de cambio, llame al método close() en la ChangeStream instancia. Esto cierra el flujo de cambios y libera recursos.

changeStream.close();

Nota

Puede utilizar este ejemplo para conectarse a una instancia de MongoDB e interactuar con una base de datos que contiene datos de muestra. Para obtener más información sobre cómo conectarte a tu instancia de MongoDB y cargar un conjunto de datos de muestra, consulta Comiencea utilizar la guía del controlador Node.js.

Nota

No hay características específicas de TypeScript

El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.

El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB y muestra los eventos de cambio a medida que ocurren:

1// Watch for changes in a collection by using a change stream
2import { MongoClient } from "mongodb";
3
4// Replace the uri string with your MongoDB deployment's connection string.
5const uri = "<connection string uri>";
6
7const client = new MongoClient(uri);
8
9// Declare a variable to hold the change stream
10let changeStream;
11
12// Define an asynchronous function to manage the change stream
13async function run() {
14 try {
15 const database = client.db("insertDB");
16 const haikus = database.collection("haikus");
17
18 // Open a Change Stream on the "haikus" collection
19 changeStream = haikus.watch();
20
21 // Print change events as they occur
22 for await (const change of changeStream) {
23 console.log("Received change:\n", change);
24 }
25 // Close the change stream when done
26 await changeStream.close();
27
28 } finally {
29 // Close the MongoDB client connection
30 await client.close();
31 }
32}
33run().catch(console.dir);

Tip

Gestión explícita de recursos

El controlador de Nodo.js admite de forma nativa la gestión explícita de recursos para MongoClient, ClientSession, ChangeStreams y cursores. Esta característica es experimental y está sujeta a cambios. Para aprender a utilizar la gestión explícita de recursos, consulta las Notas de versión v6.9.

Cuando ejecutes este código y luego realices un cambio en la colección haikus, como realizar una operación de inserción o eliminación, podrás ver el documento del evento de cambio impreso en tu terminal.

Por ejemplo, si se inserta un documento en la colección, el código imprime el siguiente resultado:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

Nota

Recibir Documentos Completos de Actualizaciones

Los eventos de cambio que contienen información sobre las operaciones de actualización solo devuelven por defecto los campos modificados en lugar de todo el documento actualizado. Puedes configurar tu flujo de cambios para que también devuelva la versión más actual del documento estableciendo el campo fullDocument del objeto de opciones en "updateLookup" de la siguiente manera:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);

El siguiente ejemplo abre un flujo de cambios en la colección haikus de la base de datos insertDB. Creemos una función de escucha para recibir e imprimir los eventos de cambio que ocurren en la colección.

Primero, abre el flujo de cambios en la colección y luego define un listener en el flujo de cambios utilizando el método on(). Una vez que configures el escucha, genera un evento de cambio realizando un cambio en la colección.

Para generar el evento de cambio en la colección, utilizaremos el método insertOne() para añadir un nuevo documento. Dado que insertOne() puede ejecutarse antes de que la función oyente pueda registrarse, usamos un temporizador, definido como simulateAsyncPause, para esperar 1 segundo antes de ejecutar la inserción.

También usamos simulateAsyncPause después de la inserción del documento. Esto proporciona tiempo suficiente para que la función de escucha reciba el evento de cambio y para que el escuchador complete su ejecución antes de cerrar la instancia ChangeStream utilizando el método close().

Nota

Razón para incluir temporizadores

Los temporizadores utilizados en este ejemplo son solo para fines demostrativos. Garantizan que haya tiempo suficiente para registrar el receptor y que este procese el evento de cambio antes de salir.

Nota

No hay características específicas de TypeScript

El siguiente ejemplo de código utiliza JavaScript. No hay características específicas de TypeScript del controlador que sean relevantes para este caso de uso.

1/* Change stream listener */
2
3import { MongoClient } from "mongodb";
4
5// Replace the uri string with your MongoDB deployment's connection string
6const uri = "<connection string uri>";
7
8const client = new MongoClient(uri);
9
10const simulateAsyncPause = () =>
11 new Promise(resolve => {
12 setTimeout(() => resolve(), 1000);
13 });
14
15let changeStream;
16async function run() {
17 try {
18 const database = client.db("insertDB");
19 const haikus = database.collection("haikus");
20
21 // Open a Change Stream on the "haikus" collection
22 changeStream = haikus.watch();
23
24 // Set up a change stream listener when change events are emitted
25 changeStream.on("change", next => {
26 // Print any change event
27 console.log("received a change to the collection: \t", next);
28 });
29
30 // Pause before inserting a document
31 await simulateAsyncPause();
32
33 // Insert a new document into the collection
34 await myColl.insertOne({
35 title: "Record of a Shriveled Datum",
36 content: "No bytes, no problem. Just insert a document, in MongoDB",
37 });
38
39 // Pause before closing the change stream
40 await simulateAsyncPause();
41
42 // Close the change stream and print a message to the console when it is closed
43 await changeStream.close();
44 console.log("closed the change stream");
45 } finally {
46 // Close the database connection on completion or error
47 await client.close();
48 }
49}
50run().catch(console.dir);

Visite los siguientes recursos para obtener más material sobre las clases y métodos mencionados en esta página:

Volver

Registro

En esta página