Overview
このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションが単一のコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
アプリケーションが受信したデータをフィルタリングして変換するための集計演算子のセットを指定できます。 MongoDB 配置 v6.0 以降に接続する場合は、変更前と変更後のドキュメント データを含めるようにイベントを構成することもできます。
変更ストリームを開いて構成する方法については、次のセクションを参照してください。
変更ストリームを開く
変更ストリームを開いて、特定のタイプのデータ変更をサブスクライブし、アプリケーション内で変更イベントを生成できます。
監視するスコープの選択
変更ストリームを開くには、 MongoCollection
、 MongoDatabase
、またはMongoClient
のインスタンスで watch()
メソッドを呼び出します。
重要
スタンドアロンの MongoDB 配置では、この機能にはレプリカセットの oplog が必要なため、変更ストリームはサポートされていません。 oplogの詳細については、レプリカセットoplog MongoDB Serverのマニュアル ページ を参照してください。
watch()
メソッドを呼び出すオブジェクトによって、変更ストリームがリッスンするイベントの範囲が決まります。
MongoCollection.watch()
は、コレクションを監視します。MongoDatabase.watch()
は、 データベース内のすべてのコレクションを監視します。MongoClient.watch()
は、接続されたMongoDBデプロイ内のすべての変更を監視します。
イベントをフィルタリングする
watch()
メソッドは、最初のパラメーターとして任意の集計パイプラインを受け取ります。このパイプラインは、次のように、変更イベント出力のフィルタリングと変換に使用できるステージのリストで構成されています。
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
注意
アップデート操作変更イベントの場合、変更ストリームはデフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。 次のように、値FullDocument.UPDATE_LOOKUP
を持つChangeStreamIterable
オブジェクトのfullDocument()
メンバー メソッドを呼び出すことで、ドキュメントの最新バージョンも返すように変更ストリームを構成できます。
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
出力を管理する
watch()
メソッドは、結果にアクセスし、整理し、走査するためのいくつかのメソッドを提供するインターフェースである ChangeStreamIterable
のインスタンスを返します。ChangeStreamIterable
は、コアJavaインターフェース Iterable
を実装する親インターフェースである MongoIterable
からメソッドも継承します。
ChangeStreamIterable
でforEach()
を呼び出して、イベントが発生したときに処理するか、結果を走査するために使用できるMongoChangeStreamCursor
インスタンスを返すiterator()
メソッドを使用できます。
MongoChangeStreamCursor
インスタンスでは次のメソッドを呼び出すことができます。
hasNext()
: さらに結果があるかどうかを確認しますnext()
:コレクション内の次のドキュメントを返しますtryNext()
:変更ストリーム内の次の利用可能な要素をすぐに返します、またはnull
重要
カーソルを反復処理すると現在のスレッドがブロックされます
forEach()
または任意の iterator()
メソッドを使用してカーソルを反復処理すると、対応する変更ストリームがイベントをリッスンしている間は現在のスレッドがブロックされます。リクエストの処理やユーザー入力への応答など、他のロジックをプログラムで実行し続ける必要がある場合は、別のスレッドで変更ストリームを作成してリッスンすることを検討してください。
他のクエリによって返される MongoCursor
とは異なり、変更ストリームに関連付けられた MongoChangeStreamCursor
は変更イベントが到達するまで待機してから、next()
から結果を返します。その結果、変更ストリームの MongoChangeStreamCursor
を使用して next()
を呼び出した場合、java.util.NoSuchElementException
はスローされません。
変更ストリームから返されたドキュメントを処理するためのオプションを構成するには、 watch()
によって返されるChangeStreamIterable
オブジェクトのメンバー メソッドを使用します。 利用可能なメソッドの詳細については、この例の下部にあるChangeStreamIterable
API ドキュメントへのリンクを参照してください。
例
この例では、 myColl
コレクションで変更ストリームを開き、変更ストリーム イベントを発生に応じて出力する方法を示します。
ドライバーは変更ストリーム イベントをChangeStreamIterable
型の変数に保存します。 次の例では、ドライバーがChangeStreamIterable
オブジェクトにDocument
型を入力することを指定します。 その結果、ドライバーは個々の変更ストリーム イベントをChangeStreamDocument
オブジェクトとして保存します。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
コレクションに対して挿入操作を実行すると、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
監視例: 完全なファイル
注意
セットアップ例
この例では、接続 URI を使用してMongoDBのインスタンスに接続します。MongoDBインスタンスへの接続の詳細については、「 MongoClient の作成ガイド 」を参照してください。この例では、Atlasサンプルデータセットに含まれる sample_mflix
データベースの movies
コレクションも使用します。「Atlas を使い始める」ガイドに従って、 MongoDB Atlasの無料階層のデータベースにロードできます。
この例では、 監視メソッドを使用して変更ストリームを開く方法を示しています。Watch.java
ファイルは、パイプラインを引数として watch()
メソッドを呼び出し、"insert"
イベントと "update"
イベントのみをフィルタリングします。WatchCompanion.java
ファイルは、ドキュメントを挿入、更新、削除します。
次の例を使用するには、ファイルをこの順序で実行します。
Watch.java
ファイルを実行します。WatchCompanion.java
ファイルを実行します。
注意
WatchCompanion.java
ファイルが実行されるまで、Watch.java
ファイルは実行中を続けます。
Watch.java
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java
:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
完全なファイルの出力例
上記のアプリケーションでは、次の出力が生成されます。
Watch.java
は、insert
操作と update
操作のみをキャプチャします。これは、集計パイプラインがdelete
操作をフィルタリングで除外するためです。
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion
は 完了した操作 の概要を出力します。
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
watch()
メソッドの詳細については、次の API ドキュメントを参照してください。
変更ストリームへの集計演算子の適用
集計パイプラインをパラメーターとしてwatch()
メソッドに渡して、変更ストリームが受信する変更イベントを指定できます。
MongoDB Server のバージョンがサポートする集計演算子については、「変更ストリーム出力の変更 」を参照してください。
例
次のコード例は、集計パイプラインを適用して、挿入操作とアップデート操作のみの変更イベントを受け取るように変更ストリームを構成する方法を示しています。
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
コレクションに対してアップデート操作を実行すると、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
大規模な変更ストリーム イベントの分裂
MongoDB 7.0 以降では、 $changeStreamSplitLargeEvent
集計ステージを使用して 16 MB を超えるイベントを小さなフラグメントに分割できます。
厳密に必要な場合にのみ$changeStreamSplitLargeEvent
を使用してください。 たとえば、アプリケーションで完全なドキュメントの変更前または変更後のイメージが必要で、16 MB を超えるイベントが生成される場合は、 $changeStreamSplitLargeEvent
を使用します。
$changeStreamSplitLargeEvent ステージはフラグメントを順番に返します。 変更ストリーム カーソル を使用してフラグメントにアクセスできます。 各フラグメントには、次のフィールドを含むSplitEvent
オブジェクトが含まれます。
フィールド | 説明 |
---|---|
| フラグメントのインデックス(開始) |
| 分裂イベントを構成するフラグメントの合計数 |
次の例では、 $changeStreamSplitLargeEvent
集計ステージを使用して大規模なイベントを分割し、変更ストリームを変更します。
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
注意
集計パイプラインには$changeStreamSplitLargeEvent
ステージを 1 つだけ含めることができ、パイプラインの最後のステージである必要があります。
次の例に示すように、変更ストリーム カーソルでgetSplitEvent()
メソッドを呼び出してSplitEvent
にアクセスできます。
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
$changeStreamSplitLargeEvent
集計ステージの詳細については、 $changeStreamSplitLargetサーバーのドキュメントを参照してください。
変更前と変更後のイメージを含みます
変更イベントは、以下のデータを含めるか省略するように構成できます。
変更前のイメージ: 操作前のドキュメントのバージョンを表すドキュメント(存在する場合)
変更後のイメージ: 操作後のドキュメントのバージョンを表すドキュメント(存在する場合)
重要
配置で MongoDB v6.0 以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。
変更前イメージまたは変更後イメージを含む変更ストリーム イベントを受信するには、次のアクションを実行する必要があります。
MongoDB 配置のコレクションの変更前と変更後のイメージを有効にします。
Tip
配置で変更前と変更後のイメージを有効にする方法については、サーバー マニュアルの「 Change Streams とドキュメントの変更前イメージおよび変更後イメージ 」を参照してください。
変更前と変更後のイメージが有効になっているコレクションを作成するようにドライバーに指示する方法については、「 変更前と変更後のイメージが有効になっているコレクションの作成 」セクションを参照してください。
変更ストリームを設定して、変更前のイメージと変更後のイメージのどちらかを取得します。
Tip
変更イベントに変更前のイメージを記録するように変更ストリームを構成する方法について詳しくは、「 変更前イメージの構成例 」を参照してください。
変更イベントで変更後のイメージを記録するように変更ストリームを構成するには、「 変更後のイメージの構成例 」を参照してください。
変更前と変更後のイメージが有効になっているコレクションの作成
ドライバーを使用して、変更前イメージと変更後イメージ オプションが有効になっているコレクションを作成するには、次の例に示すように、 ChangeStreamPreAndPostImagesOptions
のインスタンスを指定し、 createCollection()
メソッドを呼び出します。
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
MongoDB Shell からcollMod
コマンドを実行すると、既存のコレクションの変更前イメージと変更後イメージ オプションを変更できます。 この操作を実行する方法については、サーバー マニュアルのcollModに関するエントリを参照してください。
警告
コレクションで変更前イメージまたは変更後イメージを有効にした場合、 collMod
を使用してこれらの設定を変更すると、そのコレクションの既存の変更ストリームが失敗する可能性があります。
変更前のイメージの構成例
次のコード例は、 myColl
コレクションの変更ストリームを構成して、変更前のイメージを含め、変更イベントを出力する方法を示しています。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
上記の例では、変更ストリームがFullDocumentBeforeChange.REQUIRED
オプションを使用するように構成されています。 このオプションは、置換、アップデート、削除する変更イベントに変更前のイメージを要求するように変更ストリームを構成します。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。
ドキュメントのamount
フィールドの値を150
から2000
に更新するとします。 この変更イベントにより、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
オプションのリストについては、 fullDocumentBeforechange APIドキュメントを参照してください。
変更後のイメージの構成例
次のコード例は、 myColl
コレクションの変更ストリームを構成して、変更前のイメージを含め、変更イベントを出力する方法を示しています。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
上記の例では、変更ストリームがFullDocument.WHEN_AVAILABLE
オプションを使用するように構成されています。 このオプションは、使用可能な場合、置換および更新の変更されたドキュメントの変更後のイメージを返すように変更ストリームを構成します。
ドキュメントのcolor
フィールドの値を"purple"
から"pink"
に更新するとします。 変更イベントによって、次の出力が生成されます。
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
詳細情報
API ドキュメント
変更ストリームの管理に使用されるメソッドとクラスの詳細については、次のAPIドキュメントを参照してください。