You can keep track of changes to data in MongoDB, such as changes to a collection, database, or deployment, by opening a change stream. A change stream allows applications to watch for changes to data and react to them.
The change stream returns change event documents when changes occur. A change event contains information about the updated data.
Open a change stream by calling the watch() method on a
MongoCollection, MongoDatabase, or MongoClient object as shown in
the following code example:
val changeStream = collection.watch() 
The watch() method optionally takes an aggregation pipeline  which
consists of an array of stages as the first parameter to filter and
transform the change event output as follows:
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline) 
The watch() method returns an instance of ChangeStreamFlow, a class
that offers several methods to access, organize, and traverse the results.
ChangeStreamFlow also inherits methods from its parent class Flow
from the Kotlin Coroutines library.
You can call collect() on the ChangeStreamFlow to handle
events as they occur. Alternatively, you can use other methods built in to Flow
to work with the results.
To configure options for processing the documents returned from the change
stream, use member methods of the ChangeStreamFlow object returned
by watch(). See the link to the ChangeStreamFlow API
documentation at the bottom of this example for more details on the
available methods.
Process Change Stream Events with .collect()
To capture events from a change stream, call the collect() method
as shown below:
val changeStream = collection.watch() changeStream.collect {     println("Change observed: $it") } 
The .collect() function triggers when a change event is emitted. You can
specify logic in the function to process the event document when it is
received.
Note
For update operation change events, change streams only return the modified
fields by default rather than the entire updated document. You can configure
your change stream to also return the most current version of the document
by calling the fullDocument() member method of the ChangeStreamFlow
object with the value FullDocument.UPDATE_LOOKUP as follows:
val changeStream = collection.watch()     .fullDocument(FullDocument.UPDATE_LOOKUP) 
Example
The following example application opens a change stream on the movies collection
in the sample_mflix database. The application use an aggregation pipeline
to filter changes based on  operationType so that it only receives insert and update
events. Deletes are excluded by omission. The application uses the .collect() method
to receive and print the filtered change events that occur on the collection.
The application launches the collect() operation in a separate coroutine job,
which allows the application to continue running while the change stream is open.
Once the operations are complete, the application closes the change stream and exits.
Note
This example connects to an instance of MongoDB using a connection URI. To learn more about connecting to your MongoDB instance, see the connection guide.
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking {     // Replace the uri string with your MongoDB deployment's connection string     val uri = "<connection string uri>"     val mongoClient = MongoClient.create(uri)     val database = mongoClient.getDatabase("sample_mflix")     val collection = database.getCollection<Movie>("movies")     val job = launch {         val pipeline = listOf(             Aggregates.match(                 Filters.`in`("operationType", mutableListOf("insert", "update"))             )         )         val changeStreamFlow = collection.watch(pipeline)             .fullDocument(FullDocument.DEFAULT)         changeStreamFlow.collect { event ->             println("Received a change to the collection: $event")         }     }     // Insert events captured by the change stream watcher     collection.insertOne(Movie("Back to the Future", 1985))     collection.insertOne(Movie("Freaky Friday", 2003))     // Update event captured by the change stream watcher     collection.updateOne(         Filters.eq(Movie::title.name, "Back to the Future"),         Updates.set(Movie::year.name, 1986)     )     // Delete event not captured by the change stream watcher     collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday"))     sleep(1000) // Give time for the change stream watcher to process all events     // Cancel coroutine job to stop the change stream watcher     job.cancel()     mongoClient.close() } 
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}} 
For additional information on the classes and methods mentioned on this page, see the following resources:
- Change Streams Server Manual Entry 
- Change Events Server Manual Entry 
- Aggregation Pipeline Server Manual Entry 
- Aggregation Stages Server Manual Entry 
- ChangeStreamFlow API Documentation 
- MongoCollection.watch() API Documentation 
- MongoDatabase.watch() API Documentation 
- MongoClient.watch() API Documentation