Overview
En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en tiempo real en su base de datos. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.
Un flujo de cambios genera nuevos eventos de cambio, lo que proporciona acceso a cambios de datos en tiempo real. Puede abrir un flujo de cambios en una colección, una base de datos o un objeto cliente.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Datos de muestra
Los ejemplos de esta guía utilizan lo siguiente: Course Estructura como modelo para los documentos de la colección courses:
type Course struct { Title string Enrollment int32 }
Para ejecutar los ejemplos de esta guía, cargue estos documentos en la colección courses de la base de datos db utilizando el siguiente fragmento:
coll := client.Database("db").Collection("courses") docs := []any{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
Bases de datos y colecciones inexistentes
Si la base de datos y la colección necesarias no existen cuando realiza una operación de escritura, el servidor las crea implícitamente.
Cada documento contiene una descripción de un curso universitario que incluye el título del curso y la matrícula máxima, correspondiente a los campos title y enrollment de cada documento.
Nota
Cada salida de ejemplo muestra valores _data, clusterTime y ObjectID truncados porque el controlador los genera de forma única.
Abre un flujo de cambios
Puede observar los cambios en MongoDB utilizando el método Watch() en los siguientes objetos:
Colección: supervisar los cambios en una colección específica
Base de datos: Supervisa los cambios en todas las colecciones de una base de datos
MongoClient: Supervisa los cambios en todas las bases de datos
Para cada objeto, el método Watch() abre un flujo de cambios para emitir documentos de eventos de cambio cuando ocurren.
El método Watch() requiere un parámetro de contexto y un parámetro de canalización. Para devolver todos los cambios, pase un objeto Pipeline vacío.
El método Watch() utiliza opcionalmente una canalización de agregación que consta de una matriz de etapas de agregación como primer parámetro. Las etapas de agregación filtran y transforman los eventos de cambio.
Ejemplo
El siguiente ejemplo abre un flujo de cambios en la colección courses e imprime los eventos del flujo de cambios a medida que ocurren:
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Si modifica la colección courses en un programa o shell independiente, este código imprime los cambios a medida que se producen. Insertar un documento con un valor title de "Advanced Screenwriting" y un valor enrollment de 20 genera el siguiente evento de cambio:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
Para ver un ejemplo completamente ejecutable, consulte la sección Abrir un ejemplo de flujo de cambios: archivo completo en esta guía.
Eventos de cambio de filtro
Utilice el parámetro pipeline para modificar la salida del flujo de cambios. Este parámetro le permite observar únicamente ciertos eventos de cambio. Formatee el parámetro pipeline como una matriz de documentos, donde cada documento representa una etapa de agregación.
Puede utilizar las siguientes etapas de canalización en este parámetro:
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
El siguiente ejemplo abre un flujo de cambios en la base de datos db pero solo observa nuevas operaciones de eliminación:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Nota
Se llamó al método Watch() en la base de datos db, por lo que el código genera nuevas operaciones de eliminación en cualquier colección dentro de esta base de datos.
Abrir un flujo de cambios Ejemplo: Archivo completo
Nota
Configuración de ejemplo
Este ejemplo se conecta a una instancia de MongoDB mediante una URI de conexión. Para obtener más información sobre cómo conectarse a su instancia de MongoDB, consulte Guía paracrear MongoClient. Este ejemplo también utiliza la restaurants colección de la sample_restaurants base de datos incluida en los conjuntos de datos de ejemplo de Atlas. Puede cargarlos en su base de datos en la versión gratuita de MongoDB Atlas siguiendo la Guía de introducción a Atlas.
El siguiente ejemplo abre un flujo de cambios en la colección restaurants e imprime los documentos insertados:
coll := client.Database("sample_restaurants").Collection("restaurants") // Creates instructions to watch for insert operations pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}} // Creates a change stream that receives change events cs, err := coll.Watch(context.TODO(), pipeline) if err != nil { panic(err) } defer cs.Close(context.TODO()) fmt.Println("Waiting For Change Events. Insert something in MongoDB!") // Prints a message each time the change stream receives an event for cs.Next(context.TODO()) { var event bson.M if err := cs.Decode(&event); err != nil { panic(err) } output, err := json.MarshalIndent(event["fullDocument"], "", " ") if err != nil { panic(err) } fmt.Printf("%s\n", output) } if err := cs.Err(); err != nil { panic(err) }
Vea un ejemplo completamente ejecutable.
Resultado esperado
Después de ejecutar el ejemplo completo, ejecute el ejemplo de archivo completo "Insertar un documento" en un shell diferente. Al ejecutar la operación de inserción, verá un resultado similar al siguiente:
// results truncated { "_id": ..., "name": "8282", "cuisine": "Korean" }
Importante
Cuando termine de trabajar con este ejemplo de uso, asegúrese de cerrarlo cerrando su terminal.
Configurar opciones de Change Stream
Utilice el parámetro options para modificar el comportamiento del método Watch().
Puede especificar las siguientes opciones para el método Watch():
ResumeAfterStartAfterFullDocumentFullDocumentBeforeChangeBatchSizeMaxAwaitTimeCollationStartAtOperationTimeCommentShowExpandedEventsCustomCustomPipeline
Para más información sobre estas opciones, consulte db.collection.watch() entrada en el manual del servidor.
Imágenes pre y post
Al realizar una operación CRUD en una colección, el documento de evento de cambio correspondiente contiene, por defecto, solo la diferencia de los campos modificados por la operación. Puede ver el documento completo antes y después de un cambio, además de la diferencia, especificando la configuración en el parámetro options del método Watch().
Si desea ver la imagen posterior de un documento, la versión completa del documento después de un cambio, establezca el FullDocument campo del options parámetro en uno de los siguientes valores:
UpdateLookup:El documento de evento de cambio incluye una copia de todo el documento modificado.WhenAvailable:El documento de evento de cambio incluye una imagen posterior del documento modificado para eventos de cambio si la imagen posterior está disponible.Required:La salida es la misma que paraWhenAvailable, pero el controlador genera un error del lado del servidor si la imagen posterior no está disponible.
Si desea ver la imagen previa de un documento, la versión completa del documento antes de un cambio, establezca el FullDocumentBeforeChange campo del options parámetro en uno de los siguientes valores:
WhenAvailable:El documento de evento de cambio incluye una imagen previa del documento modificado para eventos de cambio si la imagen previa está disponible.Required:La salida es la misma que paraWhenAvailable, pero el controlador genera un error del lado del servidor si la imagen previa no está disponible.
Importante
Para acceder a las imágenes previas y posteriores del documento, debe habilitar changeStreamPreAndPostImages para la colección. Consulte la sección "Flujos de cambios" de la guía del comando de base de datos collMod en el manual del servidor MongoDB para obtener instrucciones y más información.
Nota
No existe una imagen previa para un documento insertado ni una imagen posterior para un documento eliminado.
Ejemplo
El siguiente ejemplo llama al método Watch() en la colección courses. Especifica un valor para el campo FullDocument del parámetro options para generar una copia de todo el documento modificado, en lugar de solo los campos modificados:
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Actualizar el valor enrollment del documento con el title de "World
Fiction" de 35 a 30 da como resultado el siguiente evento de cambio:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
Sin especificar la opción FullDocument, la misma operación de actualización ya no genera el valor "fullDocument" en el documento del evento de cambio.
Información Adicional
Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual del servidor.
Documentación de la API
Para obtener más información sobre el método Watch(), consulte la siguiente documentación de API: