Overview
En esta guía, puedes aprender a utilizar el driver de sincronización de Kotlin para supervisar un flujo de cambios, lo que te permitirá ver los cambios en tiempo real en tu base de datos. Un flujo de cambios es una funcionalidad del MongoDB Server que publica cambios de datos en una colección, base de datos o implementación. Tu aplicación puede suscribirse a un flujo de cambios y utilizar eventos para ejecutar otras acciones.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para aprender más información sobre esta funcionalidad, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Datos de muestra
Los ejemplos de esta guía utilizan el restaurants colección en la base de datos sample_restaurants de los Conjuntos de datos de muestra de Atlas. Para aprender a crear una implementación gratuita de MongoDB y cargar los conjuntos de datos de muestra, consulta la guía Comenzar de MongoDB.
La siguiente clase de datos de Kotlin modela los documentos de esta colección:
data class Restaurant( val name: String, val cuisine: String, )
Abre un flujo de cambios
Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que el flujo de cambios escucha. Puede llamar al método watch() en instancias de las siguientes clases:
MongoClient:Para monitorear todos los cambios en la implementación de MongoDBMongoDatabase: Para supervisar los cambios en todas las colecciones de la base de datosMongoCollection:Para monitorear cambios en la colección
Abrir un ejemplo de flujo de cambios
El siguiente ejemplo abre un flujo de cambios en la colección restaurants e imprime los cambios a medida que ocurren:
collection.watch().forEach { change -> println(change) }
Para empezar a observar los cambios, ejecuta la aplicación. Luego, en una aplicación o shell por separado, realiza una operación de guardado en la colección restaurants. El siguiente ejemplo actualiza un documento en el que el valor de name es "Blarney Castle":
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle") val update = Updates.set(Restaurant::cuisine.name, "Irish") val result = collection.updateOne(filter, update)
Cuando actualizas la colección, la aplicación del flujo de cambios imprime el cambio a medida que ocurre. El evento de cambio impreso se parece al siguiente:
{ "_id": { ... }, "operationType": "update", "clusterTime": { ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } ... }
Modificar la salida del flujo de cambios
Puedes pasar el parámetro pipeline al método watch() para modificar la salida del flujo de cambios. Este parámetro te permite monitorear solo los eventos de cambio especificados. Formatee el parámetro como una lista de objetos en la que cada uno representa una etapa de agregación.
Puede especificar las siguientes etapas en el parámetro pipeline:
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
Ejemplo de eventos específicos de coincidencias
El siguiente ejemplo utiliza el parámetro pipeline que incluye un $match para abrir un flujo de cambios que solo registre operaciones de actualización:
val pipeline = listOf( Aggregates.match(Filters.eq("operationType", "update")) ) collection.watch(pipeline).forEach { change -> println(change) }
Para obtener más información sobre cómo modificar la salida de tu flujo de cambios, consulta la sección Modificar salida del flujo de cambio en el manual del MongoDB Server.
Modificar el comportamiento de watch()
Puedes modificar el watch() encadenando métodos al objeto ChangeStreamIterable retornado por la llamada al método watch(). Si no especificas ninguna opción, el driver no personaliza la operación.
La siguiente tabla describe métodos que puede usar para personalizar el comportamiento de watch():
Método | Descripción |
|---|---|
| Sets the number of documents to return per batch. |
| Specifies the kind of language collation to use when sorting
results. For more information, see Collation
in the MongoDB Server manual. |
| Specifies a comment to attach to the operation. |
| Sets the fullDocument value. To learn more, see the
Include Pre-Images and Post-Images section of this document. |
| Sets the fullDocumentBeforeChange value. To learn more, see the
Include Pre-Images and Post-Images section of this document. |
| Sets the maximum await execution time on the server for this operation, in
milliseconds. |
Para obtener una lista completa de los métodos que puede utilizar para configurar el watch() método, consulte la documentación de la API ChangeStreamIterable.
Incluir imágenes preoperativas y postoperatorias
Importante
Puede habilitar pre imágenes y post imágenes en colecciones solo si su implementación utiliza MongoDB v6.0 o posterior.
Por defecto, cuando realizas una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por esa operación. Para ver el documento completo antes o después de un cambio, encadena los métodos fullDocumentBeforeChange() o fullDocument() al método watch().
La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento del flujo de cambios, pasa una de las siguientes opciones al método fullDocumentBeforeChange():
FullDocumentBeforeChange.WHEN_AVAILABLE:El evento de cambio incluye una imagen previa del documento modificado para eventos de cambio solo si la imagen previa está disponible.FullDocumentBeforeChange.REQUIREDEl evento de cambio incluye una preimagen del documento modificado para los eventos de cambio. Si la preimagen no está disponible, el driver genera un error.
La imagen posterior es la versión completa de un documento después de un cambio. Para incluirla en el evento de flujo de cambios, pase una de las siguientes opciones al fullDocument() método:
FullDocument.UPDATE_LOOKUP: El evento de cambio incluye una copia de todo el documento cambiado a partir de cierto tiempo después del cambio.FullDocument.WHEN_AVAILABLE: El evento de cambio incluye una postimagen del documento modificado para eventos de cambio solo si la postimagen está disponible.FullDocument.REQUIRED: El evento de cambio incluye una imagen posterior del documento modificado para eventos de cambio. Si la imagen posterior no está disponible, el driver genera un error.
El siguiente ejemplo llama el método watch() en una colección e incluye la imagen posterior de los documentos actualizados en los resultados al especificar el parámetro fullDocument:
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change -> println("Received a change: $change") }
Con la aplicación de flujo de cambios ejecutándose, actualizar un documento en la colección restaurants usando el ejemplo anterior de actualización imprime un evento de cambio semejante al siguiente:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{value=..., seconds=..., inc=...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
Para obtener más información sobre pre-imágenes y post-imágenes, consulta Change Streams con preimágenes y postimágenes de documentos en el manual de MongoDB Server.
Información Adicional
Para saber más sobre las change streams, consulta Change Streams en el manual de MongoDB Server.
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: