Docs Menu
Docs Home
/ /

Monitorear datos con flujos de cambios

En esta guía, aprenderá a usar el controlador Kotlin Sync para supervisar un flujo de cambios, lo que le permitirá ver los cambios en tiempo real en su base de datos. Un flujo de cambios es una función de MongoDB Server que publica los cambios en los datos de una colección, base de datos o implementación. Su aplicación puede suscribirse a un flujo de cambios y usar eventos para realizar otras acciones.

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.

Los ejemplos de esta guía utilizan el restaurants Colección en la base de datos sample_restaurants de los conjuntos de datos de muestra de Atlas. Para aprender a crear una implementación gratuita de MongoDB y cargar los conjuntos de datos de muestra, consulte la guía de introducción a MongoDB.

La siguiente clase de datos de Kotlin modela los documentos de esta colección:

data class Restaurant(
val name: String,
val cuisine: String,
)

Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que el flujo de cambios escucha. Puede llamar al método watch() en instancias de las siguientes clases:

  • MongoClient:Para monitorear todos los cambios en la implementación de MongoDB

  • MongoDatabase:Para monitorear los cambios en todas las colecciones de la base de datos

  • MongoCollection:Para monitorear cambios en la colección

El siguiente ejemplo abre un flujo de cambios en la colección restaurants e imprime los cambios a medida que ocurren:

collection.watch().forEach { change ->
println(change)
}

Para empezar a observar los cambios, ejecute la aplicación. Luego, en una aplicación o shell independiente, realice una operación de escritura en la colección restaurants. El siguiente ejemplo actualiza un documento cuyo valor de name es "Blarney Castle":

val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
val update = Updates.set(Restaurant::cuisine.name, "Irish")
val result = collection.updateOne(filter, update)

Al actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que se produce. El evento de cambio impreso se parece al siguiente:

{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
...
}

Puede pasar el parámetro pipeline al método watch() para modificar la salida del flujo de cambios. Este parámetro le permite observar únicamente los eventos de cambio especificados. Formatee el parámetro como una lista de objetos, cada uno de los cuales representa una etapa de agregación.

Puede especificar las siguientes etapas en el parámetro pipeline:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

El siguiente ejemplo utiliza el parámetro pipeline que incluye un $match para abrir un flujo de cambios que solo registra operaciones de actualización:

val pipeline = listOf(
Aggregates.match(Filters.eq("operationType", "update"))
)
collection.watch(pipeline).forEach { change ->
println(change)
}

Para obtener más información sobre cómo modificar la salida de su flujo de cambios, consulte la sección Modificar la salida del flujo de cambios en el manual de MongoDB Server.

Puede modificar watch() encadenando métodos al objeto ChangeStreamIterable devuelto por la llamada al método watch(). Si no especifica ninguna opción, el controlador no personaliza la operación.

La siguiente tabla describe métodos que puede usar para personalizar el comportamiento de watch():

Método
Descripción

batchSize()

Sets the number of documents to return per batch.

collation()

Specifies the kind of language collation to use when sorting results. For more information, see Collation in the MongoDB Server manual.

comment()

Specifies a comment to attach to the operation.

fullDocument()

Sets the fullDocument value. To learn more, see the Include Pre-Images and Post-Images section of this document.

fullDocumentBeforeChange()

Sets the fullDocumentBeforeChange value. To learn more, see the Include Pre-Images and Post-Images section of this document.

maxAwaitTime()

Sets the maximum await execution time on the server for this operation, in milliseconds.

Para obtener una lista completa de los métodos que puede utilizar para configurar el watch() método, consulte la documentación de la API ChangeStreamIterable.

Importante

Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB v6.0 o posterior.

De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, encadene los métodos fullDocumentBeforeChange() o fullDocument() al método watch().

La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, pase una de las siguientes opciones al fullDocumentBeforeChange() método:

  • FullDocumentBeforeChange.WHEN_AVAILABLE:El evento de cambio incluye una imagen previa del documento modificado para eventos de cambio solo si la imagen previa está disponible.

  • FullDocumentBeforeChange.REQUIREDEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el controlador genera un error.

La imagen posterior es la versión completa de un documento después de un cambio. Para incluirla en el evento de flujo de cambios, pase una de las siguientes opciones al fullDocument() método:

  • FullDocument.UPDATE_LOOKUP:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.

  • FullDocument.WHEN_AVAILABLE:El evento de cambio incluye una imagen posterior del documento modificado solo para eventos de cambio si la imagen posterior está disponible.

  • FullDocument.REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el controlador genera un error.

El siguiente ejemplo llama el método watch() en una colección e incluye la imagen posterior de los documentos actualizados en los resultados al especificar el parámetro fullDocument:

collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
println("Received a change: $change")
}

Con la aplicación de flujo de cambios en ejecución, actualizar un documento en la restaurants colección mediante el ejemplo de actualización anterior imprime un evento de cambio similar al siguiente:

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
clusterTime=Timestamp{value=..., seconds=..., inc=...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
wallTime=BsonDateTime{value=...}}

Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Monitoring

En esta página