MongoDB 3.6では、 $changeStream集計パイプライン演算子が導入されています。
変更ストリームは、コレクション内のドキュメントに対する変更を監視する方法を提供します。 この新しいステージの使いやすさを向上させるため、 MongoCollectionタイプには新しいwatch()メソッドが含まれています。 ChangeStreamPublisherインスタンスは変更ストリームを設定し、回復可能なエラーが発生した場合は自動的に再開を試みます。
前提条件
このガイドのコード例を実行するには、次のコンポーネントを設定する必要があります。
test.restaurantsドキュメントrestaurants.jsonアセットGithubの ファイルのドキュメントが入力された コレクション。次のインポート ステートメントは次のとおりです。
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document;
重要
このガイドでは、 クイック スタート プライマリで説明されているSubscriberの実装を使用します。
MongoDB 配置への接続
まず、MongoDB 配置に接続し、 インスタンスとMongoDatabase MongoCollectionインスタンスを 宣言して定義します。
次のコードは、ポート27017のlocalhostで実行されているスタンドアロンの MongoDB 配置に接続します。 次に、 testデータベースを参照するためのdatabase変数と、 restaurantsコレクションを参照するためのcollection変数を定義します。
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
MongoDB 配置への接続の詳細については、「 MongoDB への接続」チュートリアルを参照してください。
コレクションの変更の監視
変更ストリームを作成するには、 MongoCollection.watch()メソッドのいずれかを使用します。
次の例では、変更ストリームは、観察されたすべての変更を出力します。
collection.watch().subscribe(new PrintDocumentSubscriber());
データベースの変更の監視
アプリケーションは 1 つの変更ストリームを開いて、データベースのすべての非システム コレクションを監視できます。 このような変更ストリームを作成するには、 MongoDatabase.watch()メソッドのいずれかを使用します。
次の例では、変更ストリームは、指定されたデータベースで観察されたすべての変更を出力します。
database.watch().subscribe(new PrintDocumentSubscriber());
すべてのデータベースでの変更の監視
アプリケーションは 1 つの変更ストリームを開いて、MongoDB 配置内のすべてのデータベースのすべての非システム コレクションを監視できます。 このような変更ストリームを作成するには、 MongoClient.watch()メソッドのいずれかを使用します。
次の例では、変更ストリームは、 MongoClientが接続されている配置で観察されたすべての変更を出力します。
client.watch().subscribe(new PrintDocumentSubscriber());
コンテンツのフィルタリング
集計ステージのリストをwatch()メソッドに渡して、 $changeStream演算子によって返されるデータを変更できます。
注意
すべての集計演算子がサポートされているわけではありません。 詳細については、サーバー マニュアルの「 Change Streams 」を参照してください。
次の例では、変更ストリームは、 insert 、 update 、 replace 、 deleteの操作に対応するために観察されたすべての変更を出力します。
まず、パイプラインにはoperationTypeがinsert 、 update 、 replace 、またはdeleteのいずれかであるドキュメントをフィルタリングする$matchステージが含まれています。 次に、 fullDocumentをFullDocument.UPDATE_LOOKUPに設定し、更新後のドキュメントが結果に含まれるようにします。
collection.watch( asList( Aggregates.match( Filters.in("operationType", asList("insert", "update", "replace", "delete")) ) ) ).fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(new PrintDocumentSubscriber());