Overview
このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
サンプル データ
このガイドの例では、 Atlasサンプルデータセットのsample_restaurants.restaurantsコレクションを使用します。 MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、開始ガイドを参照してください。
重要
プロジェクトリ アクター ライブラリ
このガイドでは、プロジェクト Reactive ライブラリを使用して、 Java Reactive Streams ドライバー メソッドによって返された Publisher インスタンスを消費します。Project Reactive ライブラリとその使用方法の詳細については、React ドキュメントの使用開始を参照してください。このガイドでは Project React ライブラリ メソッドをどのように使用しているかについて詳しくは、 「 MongoDBへのデータの書込み 」ガイドを参照してください。
変更ストリームを開く
変更ストリームを開くには、 watch()メソッドを呼び出します。 メソッドを呼び出すインスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 次のクラスのインスタンスでwatch()メソッドを呼び出すことができます。
MongoClient: MongoDB 配置のすべての変更を監視MongoDatabase: データベース内のすべてのコレクションの変更を監視するにはMongoCollection: コレクションの変更をモニターするには
次の例では、 restaurantsコレクションの変更ストリームを開き、変更が発生に応じて出力します。
// Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
変更の監視を開始するには、アプリケーションを実行します。 次に、別のアプリケーションまたはshellで、restaurantsコレクションに対して書込み操作を実行します。 "name"フィールドの値が"Blarney Castle"であるドキュメントを更新すると、次の変更ストリーム出力が生成されます。
Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"}, truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, wallTime=BsonDateTime{value=...}}
変更ストリーム出力の変更
集計パイプラインをパラメーターとしてwatch()メソッドに渡して、変更ストリーム出力を変更できます。 このパラメーターを使用すると、指定された変更イベントのみを監視できます。
pipelineパラメーターでは次の集計ステージを指定できます。
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
次の例では、集計パイプラインを変更ストリームに渡して、アップデート操作のみをレコードします。
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。
watch() 動作を変更する
変更ストリーム操作を構成するために使用できるオプションを表すwatch()メソッドにメソッドを連鎖させることができます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません 。
次の表では、 watch()に連鎖させて動作をカスタマイズできるメソッドについて説明します。
オプション | 説明 |
|---|---|
| ドキュメントに加えられた変更のみを表示するのではなく、変更後に完全なドキュメントを表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。 |
| ドキュメントに加えられた変更のみを表示するのではなく、変更前のドキュメント全体を表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。 |
|
|
|
|
|
|
| 空のバッチするを返す前に、新しいデータ変更が変更ストリームカーソルに報告されるまでサーバーが待機する最大時間をミリ秒単位で指定します。 デフォルトは 1000 ミリ秒です。 |
| MongoDB Server v6.0 以降、 変更ストリームは、 |
| MongoDBクラスターからのレスポンスの各バッチするで返す変更イベントの最大数を指定します。デフォルトでは 、ドライバーはこの値を
|
| 変更ストリームカーソルに使用する 照合 を指定します。 |
| 操作にコメントを付けます。 |
変更前と変更後のイメージを含めます
重要
配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。
デフォルトでは 、コレクションに対して操作を実行すると、対応する変更イベントには、その操作によって変更されたフィールドのデルタのみが含まれます。 変更前または変更後の完全なドキュメントを表示するには、 fullDocumentBeforeChange()メソッドまたはfullDocument()メソッドをwatch()メソッドにチェーンします。
変更前のイメージは、変更前のドキュメントの完全なバージョンです。 変更ストリームイベントに変更前のイメージを含めるには、次のいずれかの値をfullDocumentBeforeChange()メソッドに渡します。
FullDocumentBeforeChange.WHEN_AVAILABLE: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。FullDocumentBeforeChange.REQUIRED: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。
変更後のイメージとは、変更後のドキュメントの完全なバージョンです。 変更ストリームイベントに変更後のイメージを含めるには、次のいずれかの値をfullDocument()メソッドに渡します。
FullDocument.UPDATE_LOOKUP: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。FullDocument.WHEN_AVAILABLE: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。FullDocument.REQUIRED: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。
次の例では、コレクションの変更ストリームを開き、 fullDocument()メソッドをwatch()メソッドに連結して更新されたドキュメントの変更後のイメージを含めます。
// Creates a change stream pipeline List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.eq("operationType", "update")) ); // Opens a change stream and prints the changes as they're received including the full // document after the update ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); Flux.from(changeStreamPublisher) .doOnNext(change -> System.out.println("Received change: " + change)) .blockLast();
変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。
詳細情報
Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。