Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Supervise los datos con Change Streams

En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar los cambios en tiempo real de tu base de datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.

Un flujo de cambios genera nuevos eventos de cambio, proporcionando acceso a cambios de datos en tiempo real. Puedes abrir un flujo de cambios en una colección, base de datos u objeto cliente.

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para aprender más información sobre esta funcionalidad, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.

Los ejemplos en esta guía utilizan lo siguiente Course Estructura como modelo para los documentos de la colección courses:

type Course struct {
Title string
Enrollment int32
}

Para ejecutar los ejemplos de esta guía, cargue estos documentos en la colección courses de la base de datos db utilizando el siguiente snippet:

coll := client.Database("db").Collection("courses")
docs := []any{
Course{Title: "World Fiction", Enrollment: 35},
Course{Title: "Abstract Algebra", Enrollment: 60},
}
result, err := coll.InsertMany(context.TODO(), docs)

Tip

Bases de datos y colecciones inexistentes

Si la base de datos y la colección necesarias no existen cuando realizas una operación de guardar, el servidor las crea implícitamente.

Cada documento contiene una descripción de una asignatura universitaria que incluye el título y la cantidad máxima de inscritos, correspondientes a los campos title y enrollment en cada documento.

Nota

Cada salida de ejemplo muestra valores truncados de _data, clusterTime y ObjectID porque el controlador los genera de manera única.

Puede observar cambios en MongoDB utilizando el método Watch() en los siguientes objetos:

  • Colección: supervisar los cambios en una colección específica

  • Base de datos: Supervisa los cambios en todas las colecciones de una base de datos

  • MongoClient: Supervise los cambios en todas las bases de datos

Para cada objeto, el método Watch() abre un flujo de cambios para emitir documentos de eventos de cambio cuando ocurren.

El método Watch() requiere un parámetro de contexto y un parámetro de canalización. Para devolver todos los cambios, pase un objeto Pipeline vacío.

El método Watch() utiliza opcionalmente una canalización de agregación que consta de una matriz de etapas de agregación como primer parámetro. Las etapas de agregación filtran y transforman los eventos de cambio.

El siguiente ejemplo abre un flujo de cambios en la colección courses e imprime los eventos de flujo de cambios a medida que ocurren:

changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

Si modifica la colección courses en un programa o shell independiente, este código imprime los cambios a medida que se producen. Insertar un documento con un valor title de "Advanced Screenwriting" y un valor enrollment de 20 genera el siguiente evento de cambio:

map[_id:map[_data:...] clusterTime: {...}
documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...")
enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db]
operationType:insert]

Para ver un ejemplo completamente ejecutable, consulta la sección Abrir un ejemplo de Change Stream: Archivo completo de esta guía.

Utiliza el parámetro de pipeline para modificar la salida del flujo de cambios. Este parámetro te permite observar solo ciertos eventos de cambio. Da formato al parámetro de pipeline como un arreglo de documentos, donde cada documento representa una etapa de agregación.

Puedes utilizar las siguientes etapas de pipeline en este parámetro:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

El siguiente ejemplo abre un flujo de cambios en la base de datos db, pero solo observa nuevas operaciones de eliminación:

db := client.Database("db")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the delete operation change events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

Nota

Se llamó el método Watch() en la base de datos db, por lo tanto, el código genera nuevas operaciones de eliminación en cualquier colección dentro de esta base de datos.

Nota

Configuración de ejemplo

Este ejemplo se conecta a una instancia de MongoDB utilizando un URI de conexión. Para obtener más información sobre cómo conectar con su instancia de MongoDB, consulte Crear un MongoClient. Este ejemplo también utiliza la colección restaurants en la base de datos sample_restaurants incluida en los conjuntos de datos de muestra de Atlas. Puedes cargarlos en tu base de datos en el nivel gratuito de MongoDB Atlas siguiendo los pasos de Comenzar con Atlas.

El siguiente ejemplo abre un flujo de cambios en la colección restaurants e imprime los documentos insertados:

coll := client.Database("sample_restaurants").Collection("restaurants")
// Creates instructions to watch for insert operations
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
// Creates a change stream that receives change events
cs, err := coll.Watch(context.TODO(), pipeline)
if err != nil {
panic(err)
}
defer cs.Close(context.TODO())
fmt.Println("Waiting For Change Events. Insert something in MongoDB!")
// Prints a message each time the change stream receives an event
for cs.Next(context.TODO()) {
var event bson.M
if err := cs.Decode(&event); err != nil {
panic(err)
}
output, err := json.MarshalIndent(event["fullDocument"], "", " ")
if err != nil {
panic(err)
}
fmt.Printf("%s\n", output)
}
if err := cs.Err(); err != nil {
panic(err)
}

Vea un ejemplo completamente ejecutable.

Después de ejecutar el ejemplo completo, ejecute el ejemplo de archivo completo "Insertar un documento" en un shell diferente. Al ejecutar la operación de inserción, verá un resultado similar al siguiente:

// results truncated {
"_id": ..., "name": "8282", "cuisine": "Korean"
}

Importante

Cuando termines de trabajar con este ejemplo de uso, asegúrate de apagarlo cerrando tu terminal.

Utiliza el parámetro options para modificar el comportamiento del método Watch().

Puede especificar las siguientes opciones para el método Watch():

  • ResumeAfter

  • StartAfter

  • FullDocument

  • FullDocumentBeforeChange

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

  • Comment

  • ShowExpandedEvents

  • Custom

  • CustomPipeline

Para más información sobre estas opciones, consulte db.collection.watch() entrada en el manual del servidor.

Cuando realices cualquier operación CRUD en una colección, por defecto, el documento de evento de cambio correspondiente solo contiene el delta de los campos modificados por la operación. Puedes ver el documento completo antes y después de un cambio, además del delta, especificando la configuración en el parámetro options del método Watch().

Si desea ver la imagen posterior de un documento, la versión completa del documento después de un cambio, establezca el FullDocument campo del options parámetro en uno de los siguientes valores:

  • UpdateLookup: El documento del evento de cambio incluye una copia de todo el documento modificado.

  • WhenAvailable:El documento de evento de cambio incluye una imagen posterior del documento modificado para eventos de cambio si la imagen posterior está disponible.

  • RequiredLa salida es la misma que para WhenAvailable, pero el controlador genera un error del lado del servidor si la imagen posterior no está disponible.

Si deseas ver la pre-imagen de un documento, la versión completa del documento antes de un cambio, establece el campo FullDocumentBeforeChange del parámetro options en uno de los siguientes valores:

  • WhenAvailable:El documento de evento de cambio incluye una imagen previa del documento modificado para eventos de cambio si la imagen previa está disponible.

  • Required: La salida es la misma que para WhenAvailable, pero el controlador genera un error del lado del servidor si la preimagen no está disponible.

Importante

Para acceder a las imágenes previas y posteriores del documento, debe habilitar changeStreamPreAndPostImages para la colección. Consulte la sección "Flujos de cambios" de la guía del comando de base de datos collMod en el manual del servidor MongoDB para obtener instrucciones y más información.

Nota

No hay preimagen para un documento insertado ni postimagen para un documento eliminado.

El siguiente ejemplo llama al método Watch() en la colección courses. Especifica un valor para el campo FullDocument del parámetro options para generar una copia de todo el documento modificado, en lugar de solo los campos modificados:

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

Actualizar el valor de enrollment del documento con el title de "World Fiction" de 35 a 30 produce el siguiente evento de cambio:

{"_id": {"_data": "..."},"operationType": "update","clusterTime":
{"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}},
"ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}},
"updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

Sin especificar la opción FullDocument, la misma operación de actualización ya no genera el valor "fullDocument" en el documento del evento de cambio.

Para obtener más información sobre Change Streams, consulta Change Streams en el Manual del servidor.

Para obtener más información sobre el método Watch(), consulte la siguiente documentación de API:

Volver

Registro

En esta página