Docs Menu
Docs Home

Esté atento a los cambios

Puede realizar un seguimiento de los cambios en los datos de MongoDB, como los cambios en una colección, base de datos o implementación, abriendo un flujo de cambios. Un flujo de cambios permite a las aplicaciones detectar cambios en los datos y reaccionar ante ellos.

El flujo de cambios devuelve documentos de eventos de cambio cuando se producen cambios. Un evento de cambio contiene información sobre los datos actualizados.

Abra un flujo de cambios llamando al watch() método en un objeto MongoCollection, MongoDatabase o MongoClient como se muestra en el siguiente ejemplo de código:

val changeStream = collection.watch()

El watch() método toma opcionalmente una secuencia de agregación que consta de una matriz de etapas como el primer parámetro para filtrar y transformar la salida del evento de cambio de la siguiente manera:

val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15)))
val changeStream = collection.watch(pipeline)

El método watch() devuelve una instancia de ChangeStreamFlow, una clase que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamFlow también hereda métodos de su clase padre Flow de la biblioteca Kotlin Coroutines.

Puedes llamar a collect() en ChangeStreamFlow para gestionar los eventos a medida que ocurren. Como alternativa, puedes usar otros métodos integrados en Flow para trabajar con los resultados.

Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamFlow devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamFlow al final de este ejemplo para más detalles sobre los métodos disponibles.

Para capturar eventos de un flujo de cambios, llame al método collect() como se muestra a continuación:

val changeStream = collection.watch()
changeStream.collect {
println("Change observed: $it")
}

La función .collect() se activa al emitirse un evento de cambio. Puede especificar la lógica en la función para procesar el documento de evento al recibirlo.

Nota

Para los eventos de cambio de la operación de actualización, los flujos de cambio solo devuelven los campos modificados por defecto, en lugar de todo el documento actualizado. Puede configurar su flujo de cambio para que también devuelva la versión más reciente del documento llamando al método miembro fullDocument() del objeto ChangeStreamFlow con el valor FullDocument.UPDATE_LOOKUP, como se indica a continuación:

val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)

La siguiente aplicación de ejemplo abre un flujo de cambios en la colección movies de la base de datos sample_mflix. La aplicación utiliza una canalización de agregación para filtrar los cambios según operationType, de modo que solo recibe eventos de inserción y actualización. Las eliminaciones se excluyen por omisión. La aplicación utiliza el método .collect() para recibir e imprimir los eventos de cambio filtrados que ocurren en la colección.

La aplicación inicia la operación collect() en una corrutina independiente, lo que permite que la aplicación continúe ejecutándose mientras el flujo de cambios esté abierto. Una vez completadas las operaciones, la aplicación cierra el flujo de cambios y sale.

Nota

Este ejemplo se conecta a una instancia de MongoDB mediante una URI de conexión. Para obtener más información sobre cómo conectarse a su instancia de MongoDB, consulte Guía de conexión.

import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
data class Movie(val title: String, val year: Int)
fun main() = runBlocking {
// Replace the uri string with your MongoDB deployment's connection string
val uri = "<connection string uri>"
val mongoClient = MongoClient.create(uri)
val database = mongoClient.getDatabase("sample_mflix")
val collection = database.getCollection<Movie>("movies")
val job = launch {
val pipeline = listOf(
Aggregates.match(
Filters.`in`("operationType", mutableListOf("insert", "update"))
)
)
val changeStreamFlow = collection.watch(pipeline)
.fullDocument(FullDocument.DEFAULT)
changeStreamFlow.collect { event ->
println("Received a change to the collection: $event")
}
}
// Insert events captured by the change stream watcher
collection.insertOne(Movie("Back to the Future", 1985))
collection.insertOne(Movie("Freaky Friday", 2003))
// Update event captured by the change stream watcher
collection.updateOne(
Filters.eq(Movie::title.name, "Back to the Future"),
Updates.set(Movie::year.name, 1986)
)
// Delete event not captured by the change stream watcher
collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))
sleep(1000) // Give time for the change stream watcher to process all events
// Cancel coroutine job to stop the change stream watcher
job.cancel()
mongoClient.close()
}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}}
Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}

Para obtener información adicional sobre las clases y métodos mencionados en esta página, consulte los siguientes recursos:

  • Entrada manualdel servidor de flujos de cambios

  • Entrada manual del servidor de eventos de cambio

  • Entrada manual del servidor decanalización de agregación

  • Entrada manualdel servidor de etapas de agregación

  • Flujo de flujo de cambios Documentación de la API

  • Documentación de la APIMongoCollection.watch()

  • Documentación de la APIMongoDatabase.watch()

  • MongoClient.watch() Documentación de la API