AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Docs Menu

変更ストリーム

このガイドでは、変更ストリームを使用してデータに対するリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。

このガイドの例では、 Atlasサンプルデータセットsample_restaurants.restaurantsコレクションを使用します。 MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、開始ガイドを参照してください。

重要

プロジェクトリ アクター ライブラリ

このガイドでは、プロジェクト Reactive ライブラリを使用して、 Java Reactive Streams ドライバー メソッドによって返された Publisher インスタンスを消費します。Project Reactive ライブラリとその使用方法の詳細については、React ドキュメントの使用開始を参照してください。このガイドでは Project React ライブラリ メソッドをどのように使用しているかについて詳しくは、 「 MongoDBへのデータの書込み 」ガイドを参照してください。

変更ストリームを開くには、 watch()メソッドを呼び出します。 メソッドを呼び出すインスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 次のクラスのインスタンスでwatch()メソッドを呼び出すことができます。

  • MongoClient: MongoDB 配置のすべての変更を監視

  • MongoDatabase: データベース内のすべてのコレクションの変更を監視するには

  • MongoCollection: コレクションの変更をモニターするには

次の例では、 restaurantsコレクションの変更ストリームを開き、変更が発生に応じて出力します。

// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch();
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

変更の監視を開始するには、アプリケーションを実行します。 次に、別のアプリケーションまたはshellで、restaurantsコレクションに対して書込み操作を実行します。 "name"フィールドの値が"Blarney Castle"であるドキュメントを更新すると、次の変更ストリーム出力が生成されます。

Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null,
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{value=...}}

集計パイプラインをパラメーターとしてwatch()メソッドに渡して、変更ストリーム出力を変更できます。 このパラメーターを使用すると、指定された変更イベントのみを監視できます。

pipelineパラメーターでは次の集計ステージを指定できます。

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

次の例では、集計パイプラインを変更ストリームに渡して、アップデート操作のみをレコードします。

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。

変更ストリーム操作を構成するために使用できるオプションを表すwatch()メソッドにメソッドを連鎖させることができます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません 。

次の表では、 watch()に連鎖させて動作をカスタマイズできるメソッドについて説明します。

オプション
説明

fullDocument()

ドキュメントに加えられた変更のみを表示するのではなく、変更後に完全なドキュメントを表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。

fullDocumentBeforeChange()

ドキュメントに加えられた変更のみを表示するのではなく、変更前のドキュメント全体を表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。

resumeAfter()

watch()再開トークンで指定された操作の後に変更を返すことを再開するように に指示します。各変更ストリームのイベントドキュメントには、 フィールドとして再開トークンが含まれています。変更後に再開する操作を表す変更イベントドキュメントの
_id_idフィールド全体を渡します。
resumeAfter() は およびstartAfter() startAtOperationTime()と排他関係にあります。

startAfter()

watch()再開トークンで指定された操作の後に新しい変更ストリームを開始するように に指示します。無効化イベント後に通知を再開できるようにします。各変更ストリームのイベントドキュメントには、 フィールドとして再開トークンが含まれています。変更後に再開する操作を表す変更イベントドキュメントの
_id_idフィールド全体を渡します。
startAfter()resumeAfter() およびstartAtOperationTime() と排他関係にあります。

startAtOperationTime()

watch()指定されたタイムスタンプ後に発生したイベントのみを返すように に指示します。
startAtOperationTime() resumeAfter()は およびstartAfter() と排他関係にあります。

maxAwaitTime()

空のバッチするを返す前に、新しいデータ変更が変更ストリームカーソルに報告されるまでサーバーが待機する最大時間をミリ秒単位で指定します。 デフォルトは 1000 ミリ秒です。

showExpandedEvents()

MongoDB Server v6.0 以降、 変更ストリームは、createIndexes イベントや dropIndexes イベントなどのデータ定義言語(DDL)イベントの変更通知をサポートします。 展開されたイベントを変更ストリームに含めるには、このメソッドを呼び出して 値 true を渡します。

batchSize()

MongoDBクラスターからのレスポンスの各バッチするで返す変更イベントの最大数を指定します。デフォルトでは 、ドライバーはこの値を Long.MAX_VALUE に設定します。

0batchSize は、カーソルは作成されますが、 最初のバッチするではドキュメントは返されないことを意味します。

collation()

変更ストリームカーソルに使用する 照合 を指定します。

comment()

操作にコメントを付けます。

重要

配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。

デフォルトでは 、コレクションに対して操作を実行すると、対応する変更イベントには、その操作によって変更されたフィールドのデルタのみが含まれます。 変更前または変更後の完全なドキュメントを表示するには、 fullDocumentBeforeChange()メソッドまたはfullDocument()メソッドをwatch()メソッドにチェーンします。

変更前のイメージは、変更のドキュメントの完全なバージョンです。 変更ストリームイベントに変更前のイメージを含めるには、次のいずれかの値をfullDocumentBeforeChange()メソッドに渡します。

  • FullDocumentBeforeChange.WHEN_AVAILABLE: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。

  • FullDocumentBeforeChange.REQUIRED: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。

変更後のイメージとは、変更のドキュメントの完全なバージョンです。 変更ストリームイベントに変更後のイメージを含めるには、次のいずれかの値をfullDocument()メソッドに渡します。

  • FullDocument.UPDATE_LOOKUP: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。

  • FullDocument.WHEN_AVAILABLE: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。

  • FullDocument.REQUIRED: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。

次の例では、コレクションの変更ストリームを開き、 fullDocument()メソッドをwatch()メソッドに連結して更新されたドキュメントの変更後のイメージを含めます。

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received including the full
// document after the update
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。

Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。

このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。