Overview
このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
Scalaドライバーを使用する場合、watch() メソッドを呼び出して ChangeStreamObservable のインスタンスを返すことができます。その後、ChangeStreamObservableインスタンスをサブスクライブして、アップデート、挿入、削除などの新しいデータ変更を確認できます。
サンプル データ
restaurantssample_restaurantsこのガイドの例では、Atlasサンプルデータセット の データベースの コレクションを使用します。Scalaアプリケーションからこのコレクションにアクセスするには、Atlas クラスターに接続する MongoClient を作成し、database 変数と collection 変数に次の値を割り当てます。
val database: MongoDatabase = client.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Tip
MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、MongoDBを使い始めるガイドを参照してください。
一部の例では、変更ストリームイベントを処理するために LatchedObserverクラスのインスタンスを使用しています。 このクラスは、変更ストリームイベントを出力し、ストリームが完了またはエラーを生成するまでデータ変更の監視を続けるカスタム オブザーバーです。 LatchedObserverクラス を使用するには、次のコードをアプリケーションファイルに貼り付けます。
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] { val latch = new CountDownLatch(1) override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument) override def onError(throwable: Throwable): Unit = { println(s"Error: '$throwable") latch.countDown() } override def onComplete(): Unit = latch.countDown() def await(): Unit = latch.await() }
変更ストリームを開く
変更ストリームを開くには、 watch()メソッドを呼び出します。 watch()メソッドを呼び出すインスタンスによって、変更ストリームが監視するイベントの範囲が決まります。 次のクラスのインスタンスでwatch()メソッドを呼び出すことができます。
MongoClient: 配置内のすべてのデータベースにわたるすべてのコレクションに対する変更を監視します。ただし、システム コレクションまたはadmin、 、 データベースのコレクションは対象外です。localconfigMongoDatabase: 1 つのデータベース内のすべてのコレクションに対する変更を監視MongoCollection: 1 つのコレクションの変更を監視
次の例では、 watch() メソッドを呼び出して、restaurantsコレクションの変更ストリームを開きます。 このコードでは、変更が発生したときに受信して出力する LatchedObserverインスタンスが作成されます。
val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await()
変更の監視を開始するには、上記のコードを実行します。 次に、別のシェルで次のコードを実行して、nameフィールドの値が "Blarney Castle" であるドキュメントを更新します。
val filter = equal("name", "Blarney Castle") val update = set("cuisine", "American") collection.updateOne(filter, update) .subscribe((res: UpdateResult) => println(res), (e: Throwable) => println(s"There was an error: $e"))
上記のコードを実行してコレクション を更新すると、変更ストリームアプリケーションは変更が発生に応じて出力します。 出力される変更イベントは、次の出力のようになります。
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {...}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
変更ストリーム出力の変更
変更ストリーム出力を変更するには、パイプラインステージのリストをパラメーターとして watch() メソッドに渡します。 リストには、次のステージを含めることができます。
$addFieldsまたは$set: ドキュメントに新しいフィールドを追加します$match: ドキュメントをフィルタリングします$project:ドキュメントフィールドのサブセットをプロジェクションします$replaceWithまたは$replaceRoot: 入力ドキュメントを指定されたドキュメントで置き換え$redact: ドキュメントのコンテンツを制限します$unset: ドキュメントからフィールドを削除します
Scalaドライバーは Aggregatesクラスを提供します。このクラスには、前述のパイプラインステージを構築するためのヘルパーメソッドが含まれています。
Tip
パイプラインステージとそれに対応する Aggregatesヘルパーメソッドの詳細については、次のリソースを参照してください。
MongoDB Serverマニュアルの「 集計ステージ 」
次の例では、 Aggregates.filter() メソッドを使用して $match ステージを構築するパイプラインを作成します。 次に、コードはこのパイプラインをwatch() メソッドに渡し、アップデート操作が発生した場合にのみイベントを出力するように watch() に指示します。
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", "update")))) observer.await()
watch() の動作を変更する
ChangeStreamObservableクラスによって提供されるメソッドを連鎖させることで、watch() メソッドの動作を変更できます。 次の表では、これらの方法の一部について説明しています。
方式 | 説明 |
|---|---|
| ドキュメントに加えられた変更のみを表示するのではなく、変更後にドキュメント全体を表示するかどうかを指定します。 このオプションの詳細については、このガイドの「 変更前とイメージと変更後のイメージを含める 」セクションを参照してください。 |
| ドキュメントに加えられた変更のみを表示するのではなく、変更前のドキュメント全体を表示するかどうかを指定します。 このオプションの詳細については、「 変更前イメージと変更後イメージを含める 」を参照してください。 |
| 操作にコメントを付けます。 |
| 指定されたタイムスタンプの時点またはその後に発生した変更のみを提供するように変更ストリームに指示します。 |
| 変更ストリームカーソルに使用する照合を設定します。 |
watch() オプションの完全なリストについては、 APIドキュメントのChangeStreamObservableを参照してください。
変更前と変更後のイメージを含めます
重要
配置でMongoDB Server v6.0 以降が使用されている場合にのみ、コレクションの変更前イメージと変更後イメージを有効にできます。
デフォルトでは 、コレクションに対して操作を実行すると、対応する変更イベントには操作の前後で変更されたフィールドとその値のみが含まれます。
watch()変更されたフィールドに加えて、ドキュメントの 変更前のイメージ 、変更前のドキュメントの完全なバージョンを返すよう、 メソッドに指示できます。変更ストリームイベントに変更前のイメージを含めるには、fullDocumentBeforeChange() メソッドを watch() にチェーンします。 次のいずれかの値を fullDocumentBeforeChange() メソッドに渡します。
FullDocumentBeforeChange.WHEN_AVAILABLE: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、この変更イベントフィールドの値はnullになります。FullDocumentBeforeChange.REQUIRED: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、サーバーはエラーを発生させます。
また、変更されたwatch() フィールドに加えて、ドキュメントの 変更後のイメージ 、変更後のドキュメントの完全なバージョンを返すよう、 メソッドに指示することもできます。変更ストリームイベントに変更後のイメージを含めるには、fullDocument() メソッドを watch() にチェーンします。 次のいずれかの値を fullDocument() メソッドに渡します。
FullDocument.UPDATE_LOOKUP: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。FullDocument.WHEN_AVAILABLE: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後イメージが利用できない場合、この変更イベントフィールドの値はnullになります。FullDocument.REQUIRED: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、サーバーはエラーを発生させます。
次の例では、コレクションで watch() メソッドを呼び出し、fullDocument() メソッドを連鎖させることで更新されたドキュメントの変更後のイメージを含めています。
val observer = LatchedObserver() collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) .subscribe(observer) observer.await()
restaurants別のシェルで実行中変更ストリームアプリケーションでは、前述の更新例を使用して コレクション内のドキュメントを更新すると、次の出力のような変更イベントが出力されます。
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Iterable((_id,BsonObjectId{...}), (address,{"building": "202-24", "coord": [-73.9250442, 40.5595462], "street": "Rockaway Point Boulevard", "zipcode": "11697"}), (borough,BsonString{value='Queens'}), (cuisine,BsonString{value='Irish'}), (grades,BsonArray{values=[...]}), (name,BsonString{value='Blarney Castle'}), (restaurant_id,BsonString{...}), (blank,BsonString{value='Irish'})), fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription= UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{...}}
Tip
変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。
詳細情報
Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。