変更ストリームを開くと、コレクション、データベース、または配置への変更など、MongoDB のデータに対する変更を追跡できます。 変更ストリームを使用すると、アプリケーションはデータの変更をReactし、それに対応できます。
変更ストリームは、変更が発生したときに変更イベントドキュメントを返します。 変更イベントにはアップデートされたデータに関する情報が含まれます。
次のコード例に示すように、 MongoCollection 、 MongoDatabase 、またはMongoClientオブジェクトで watch()メソッドを呼び出して変更ストリームを開きます。
val changeStream = collection.watch()
watch()メソッドは、変更イベント出力を次のようにフィルタリングして変換する最初のパラメーターとして、ステージの配列で構成される集計パイプラインをオプションで使用します。
val pipeline = listOf(Aggregates.match(Filters.lt("fullDocument.runtime", 15))) val changeStream = collection.watch(pipeline)
watch()メソッドは、結果にアクセスし、整理し、走査するためのいくつかのメソッドを提供するクラスであるChangeStreamFlowのインスタンスを返します。 ChangeStreamFlowは、 Kotlin コルーチン ライブラリの親クラスFlowからメソッドも継承します。
ChangeStreamFlowでcollect()を呼び出して、イベントが発生したときに処理できます。 あるいは、 Flowに組み込まれている他のメソッドを使用して結果を操作することもできます。
変更ストリームから返されたドキュメントを処理するためのオプションを構成するには、 watch()によって返されるChangeStreamFlowオブジェクトのメンバー メソッドを使用します。 利用可能なメソッドの詳細については、この例の下部にあるChangeStreamFlow API ドキュメントへのリンクを参照してください。
.collection() による変更ストリーム イベントの処理
変更ストリームからイベントをキャプチャするには、次のようにcollect()メソッドを呼び出します。
val changeStream = collection.watch() changeStream.collect { println("Change observed: $it") }
.collect()関数は、変更イベントが発生したときにトリガーされます。 イベント ドキュメントを受信時に処理するよう、関数内でロジックを指定できます。
注意
アップデート操作変更イベントの場合、変更ストリームはデフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。 次のように、値FullDocument.UPDATE_LOOKUPを持つChangeStreamFlowオブジェクトのfullDocument()メンバー メソッドを呼び出すことで、ドキュメントの最新バージョンも返すように変更ストリームを構成できます。
val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP)
例
次のサンプルアプリケーションでは、 sample_mflixデータベース内のmoviesコレクションの変更ストリームを開きます。 アプリケーションは集計パイプラインを使用して、 operationTypeに基づいて変更をフィルタリングし、挿入イベントと更新イベントのみを受け取るようにします。 削除は 省略 によって除外されます。 アプリケーションは.collect()メソッドを使用して、コレクションで発生するフィルタリングされた変更イベントを受信して出力します。
アプリケーションはcollect()操作を別のコルーチン ジョブで起動するため、変更ストリームが開いている間もアプリケーションは実行を継続できます。 操作が完了すると、アプリケーションは変更ストリームを閉じて終了します。
注意
この例では、接続 URI を使用してMongoDBのインスタンスに接続します。 MongoDBインスタンスへの接続の詳細については、 接続ガイドを参照してください。
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import com.mongodb.client.model.changestream.FullDocument import com.mongodb.kotlin.client.coroutine.MongoClient import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Thread.sleep data class Movie(val title: String, val year: Int) fun main() = runBlocking { // Replace the uri string with your MongoDB deployment's connection string val uri = "<connection string uri>" val mongoClient = MongoClient.create(uri) val database = mongoClient.getDatabase("sample_mflix") val collection = database.getCollection<Movie>("movies") val job = launch { val pipeline = listOf( Aggregates.match( Filters.`in`("operationType", mutableListOf("insert", "update")) ) ) val changeStreamFlow = collection.watch(pipeline) .fullDocument(FullDocument.DEFAULT) changeStreamFlow.collect { event -> println("Received a change to the collection: $event") } } // Insert events captured by the change stream watcher collection.insertOne(Movie("Back to the Future", 1985)) collection.insertOne(Movie("Freaky Friday", 2003)) // Update event captured by the change stream watcher collection.updateOne( Filters.eq(Movie::title.name, "Back to the Future"), Updates.set(Movie::year.name, 1986) ) // Delete event not captured by the change stream watcher collection.deleteOne(Filters.eq(Movie::title.name, "Freaky Friday")) sleep(1000) // Give time for the change stream watcher to process all events // Cancel coroutine job to stop the change stream watcher job.cancel() mongoClient.close() }
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C0000000022B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C0E6873977DD9059EE0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Back to the Future, year=1985), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c0e6873977dd9059ee"}}, clusterTime=Timestamp{value=7234215589353357314, seconds=1684347072, inc=2}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347072952}} Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "82646518C1000000012B022C0100296E5A1004782683FAB5A741B0B0805C207A7FCCED46645F69640064646518C1E6873977DD9059EF0004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Movie(title=Freaky Friday, year=2003), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "646518c1e6873977dd9059ef"}}, clusterTime=Timestamp{value=7234215593648324609, seconds=1684347073, inc=1}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684347073112}} Received a change to the collection: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "8264651D4A000000042B022C0100296E5A1004CAEADF0D7376406A8197E3082CDB3D3446645F6964006464651D4A8C2D2556BA204FB40004"}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "64651d4a8c2d2556ba204fb4"}}, clusterTime=Timestamp{value=7234220580105355268, seconds=1684348234, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"year": 1986}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1684348234958}}
このページで言及されているクラスとメソッドについての追加情報については、次のリソースを参照してください。
Change Streams Server の手動入力
変更イベントサーバー マニュアル エントリ
集計パイプラインサーバーのマニュアルエントリ
集計ステージサーバー マニュアル エントリ
ChangeStreamFlow APIドキュメント
MongoCollection.watch() APIドキュメント
MongoDatabase.watch() APIドキュメント
MongoClient.watch()APIドキュメント