Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home

Observe los cambios

Puede mantenerse al tanto de los cambios en los datos de MongoDB, como modificaciones en una colección, base de datos o implementación, abriendo un change stream (flujo de cambios). Un flujo de cambios permite a las aplicaciones observar los cambios en los datos y reaccionar a ellos.

El flujo de cambios devuelve documentos de eventos de cambio cuando ocurren cambios. Un evento de cambio contiene información sobre los datos actualizados.

Abre un flujo de cambios llamando a la watch() método en un objeto MongoCollection, MongoDatabase o MongoClient como se muestra en el siguiente ejemplo de código:

val changeStream = collection.watch()

El método watch() toma opcionalmente un pipeline de agregación que consiste en un arreglo de etapas como primer parámetro para filtrar y transformar la salida del evento de cambio de la siguiente manera:

val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15)))
val changeStream = collection.watch(pipeline)

El método watch() devuelve una instancia de ChangeStreamFlow, una clase que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamFlow también hereda métodos de su clase superior Flow de la librería Kotlin Coroutines.

Puede llamar a collect() en el ChangeStreamFlow para gestionar los eventos a medida que ocurren. También puedes utilizar otros métodos de la funcionalidad incorporada en Flow para trabajar con los resultados.

Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamFlow devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamFlow al final de este ejemplo para más detalles sobre los métodos disponibles.

Para capturar eventos de un flujo de cambios, llama al método collect() como se muestra a continuación:

val changeStream = collection.watch()
changeStream.collect {
println("Change observed: $it")
}

La función .collect() se activa cuando se emite un evento de cambio. Puedes especificar la lógica en la función para procesar el documento de evento cuando se reciba.

Nota

Para los eventos de cambio de la operación de actualización, los flujos de cambio devuelven por defecto solo los campos modificados y no todo el documento actualizado. Puedes configurar tu flujo de cambios para que también devuelva la versión más actual del documento llamando al método nodo fullDocument() del objeto ChangeStreamFlow con el valor FullDocument.UPDATE_LOOKUP de la siguiente manera:

val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)

El siguiente ejemplo de aplicación abre un flujo de cambios en la colección movies de la base de datos sample_mflix. La aplicación utiliza una canalización de agregación para filtrar cambios basados en operationType para que solo reciba eventos de inserción y actualización. Las eliminaciones se excluyen por omisión. La aplicación utiliza el método .collect() para recibir e imprimir los eventos de cambio filtrados que se producen en la colección.

La aplicación inicia la operación collect() en una tarea de corrutina separada, lo que permite que la aplicación siga funcionando mientras el flujo de cambios está abierto. Una vez que se completan las operaciones, la aplicación cierra el flujo de cambios y sale.

Nota

Este ejemplo se conecta a una instancia de MongoDB utilizando un URI de conexión. Para obtener más información sobre cómo conectarse a tu instancia de MongoDB, consulta el guía de conexión.

import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.coroutine.MongoClient
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
data class Movie(val title: String, val year: Int)
fun main() = runBlocking {
// Replace the uri string with your MongoDB deployment's connection string
val uri = "<connection string uri>"
val mongoClient = MongoClient.create(uri)
val database = mongoClient.getDatabase("sample_mflix")
val collection = database.getCollection<Movie>("movies")
val job = launch {
val pipeline = listOf(
Aggregates.match(
Filters.`in`("operationType", mutableListOf("insert", "update"))
)
)
val changeStreamFlow = collection.watch(pipeline)
.fullDocument(FullDocument.DEFAULT)
changeStreamFlow.collect { event ->
println("Received a change to the collection: $event")
}
}
// Insert events captured by the change stream watcher
collection.insertOne(Movie("Back to the Future", 1985))
collection.insertOne(Movie("Freaky Friday", 2003))
// Update event captured by the change stream watcher
collection.updateOne(
Filters.eq(Movie::title.name, "Back to the Future"),
Updates.set(Movie::year.name, 1986)
)
// Delete event not captured by the change stream watcher
collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))
sleep(1000) // Give time for the change stream watcher to process all events
// Cancel coroutine job to stop the change stream watcher
job.cancel()
mongoClient.close()
}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}}
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}}
Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}

Para obtener información adicional sobre las clases y métodos mencionados en esta página, consulta los siguientes recursos:

  • Entrada del Manual de Servidor de Change Streams

  • Cambiar eventos Entrada manual del servidor

  • Pipeline de agregación Entrada de Manual del Servidor

  • Etapas de agregación Entrada manual del servidor

  • ChangeStreamFlow Documentación de la API

  • MongoCollection.watch() Documentación de la API

  • MongoDatabase.watch() Documentación de la API

  • MongoClient.watch() Documentación de la API