Puede mantenerse al tanto de los cambios en los datos de MongoDB, como modificaciones en una colección, base de datos o implementación, abriendo un change stream (flujo de cambios). Un flujo de cambios permite a las aplicaciones observar los cambios en los datos y reaccionar a ellos.
El flujo de cambios devuelve documentos de eventos de cambio cuando ocurren cambios. Un evento de cambio contiene información sobre los datos actualizados.
Abre un flujo de cambios llamando a la watch() método en un objeto MongoCollection, MongoDatabase o MongoClient como se muestra en el siguiente ejemplo de código:
val changeStream = collection.watch()
El método watch() toma opcionalmente un pipeline de agregación que consiste en un arreglo de etapas como primer parámetro para filtrar y transformar la salida del evento de cambio de la siguiente manera:
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
El método watch() devuelve una instancia de ChangeStreamFlow, una clase que ofrece varios métodos para acceder, organizar y recorrer los resultados. ChangeStreamFlow también hereda métodos de su clase superior Flow de la librería Kotlin Coroutines.
Puede llamar a collect() en el ChangeStreamFlow para gestionar los eventos a medida que ocurren. También puedes utilizar otros métodos de la funcionalidad incorporada en Flow para trabajar con los resultados.
Para configurar opciones para procesar los documentos devueltos por el flujo de cambios, utiliza métodos miembro del objeto ChangeStreamFlow devuelto por watch(). Consulta el enlace a la documentación de API ChangeStreamFlow al final de este ejemplo para más detalles sobre los métodos disponibles.
Procesar eventos de Change Stream con .collect()
Para capturar eventos de un flujo de cambios, llama al método collect() como se muestra a continuación:
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
La función .collect() se activa cuando se emite un evento de cambio. Puedes especificar la lógica en la función para procesar el documento de evento cuando se reciba.
Nota
Para los eventos de cambio de la operación de actualización, los flujos de cambio devuelven por defecto solo los campos modificados y no todo el documento actualizado. Puedes configurar tu flujo de cambios para que también devuelva la versión más actual del documento llamando al método nodo fullDocument() del objeto ChangeStreamFlow con el valor FullDocument.UPDATE_LOOKUP de la siguiente manera:
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
Ejemplo
El siguiente ejemplo de aplicación abre un flujo de cambios en la colección movies de la base de datos sample_mflix. La aplicación utiliza una canalización de agregación para filtrar cambios basados en operationType para que solo reciba eventos de inserción y actualización. Las eliminaciones se excluyen por omisión. La aplicación utiliza el método .collect() para recibir e imprimir los eventos de cambio filtrados que se producen en la colección.
La aplicación inicia la operación collect() en una tarea de corrutina separada, lo que permite que la aplicación siga funcionando mientras el flujo de cambios está abierto. Una vez que se completan las operaciones, la aplicación cierra el flujo de cambios y sale.
Nota
Este ejemplo se conecta a una instancia de MongoDB utilizando un URI de conexión. Para obtener más información sobre cómo conectarse a tu instancia de MongoDB, consulta el guía de conexión.
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
Para obtener información adicional sobre las clases y métodos mencionados en esta página, consulta los siguientes recursos:
Cambiar eventos Entrada manual del servidor
Pipeline de agregación Entrada de Manual del Servidor
Etapas de agregación Entrada manual del servidor
ChangeStreamFlow Documentación de la API
MongoCollection.watch() Documentación de la API
MongoDatabase.watch() Documentación de la API
MongoClient.watch() Documentación de la API