定義
Mongo.watch( pipeline, options )レプリカセットとシャーディングされたクラスターのみ
レプリカセットまたはシャーディングされたクラスターの変更ストリーム カーソルを開き、
admin、local、configデータベースを除き、データベース全体のsystem以外のすべてのコレクションについてレポートします。 。Parameterタイプ説明pipeline配列
任意。次の集計ステージの1つ以上で構成される集計パイプライン:
変更イベント出力をフィルタリング/修正するためのパイプラインを指定します。
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
optionsドキュメント
optionsドキュメントには、次のフィールドと値を含めることができます。フィールドタイプ説明resumeAfterドキュメント
任意。
Mongo.watch()再開トークンで指定された操作の後に再開通知を試行するように に指示します。各変更ストリームのイベント ドキュメントには、
_idフィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_idフィールド全体を渡します。resumeAfterは、startAfterおよびstartAtOperationTimeと排他関係にあります。startAfterドキュメント
任意。
Mongo.watch()再開トークンで指定された操作の後に新しい変更ストリームの開始を試行するように に指示します。無効化イベント後に通知を再開できるようにします。各変更ストリームのイベント ドキュメントには、
_idフィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_idフィールド全体を渡します。startAfterは、resumeAfterおよびstartAtOperationTimeと排他関係にあります。fullDocumentstring
任意。 デフォルトでは 、アップデートされたドキュメント全体ではなく、
Mongo.watch()アップデート操作によって変更されたフィールドのデルタが返されます。fullDocumentを"updateLookup"Mongo.watch()に設定すると、更新されたドキュメントの過半数がコミットした最新のバージョンを参照するよう に指示します。Mongo.watch()fullDocumentは、updateDescriptionデルタに加えて、ドキュメント検索を含む フィールドを返します。batchSize整数
任意。MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数を指定します。
cursor.batchSize()と同じ機能を持ちます。maxAwaitTimeMS整数
任意。空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。
デフォルトは
1000ミリ秒です。collationドキュメント
任意。 照合ドキュメントを渡し、変更ストリーム カーソルの照合順序を指定します。
省略した場合、デフォルトは
simpleバイナリ比較になります。startAtOperationTimeタイムスタンプ
任意。変更ストリームの開始点。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。oplog の時間範囲を確認するには、
rs.printReplicationInfo()を参照してください。startAtOperationTimeは、resumeAfterおよびstartAfterと排他関係にあります。次の値を返します。 変更イベント ドキュメント上のカーソル。 変更イベント ドキュメントの例については、「変更イベント」を参照してください。 Tip
互換性
このメソッドは、次の環境でホストされている配置で使用できます。
MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです
MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン
MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン
可用性
配置
Mongo.watch() は、レプリカセットとシャーディングされたクラスターで利用できます。
レプリカセットの場合、データを保持している任意のメンバーに対して
Mongo.watch()を発行できます。シャーディングされたクラスターの場合、 インスタンスで
Mongo.watch()mongosを発行する必要があります。
ストレージ エンジン
読み取り保証 (read concern)majority サポート
変更ストリームは、 "majority"の読み取り保証 (read concern) のサポートに関係なく 使用できます 。つまり、変更ストリームを使用するには、読み取り保証(read concern) majorityサポートを有効にする(デフォルト)か無効にするかを選択できます。
動作
Mongo.watch()データを保持しているノードの大半に反映されたデータ変更についてのみ通知します。変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
再開可能性
MongoDBドライバーとは異なり、 mongoshはエラー後に変更ストリーム カーソルの再開を自動的に試行しません。 MongoDB ドライバーは、特定のエラーが発生後に 変更ストリーム カーソルの自動的な再開を1 回試行します。
Mongo.watch()は、oplog に保存されている情報を使用して変更イベントの説明を生成し、その操作に関連付けられた再開トークンを生成します。 resumeAfterまたはstartAfterオプションに渡される再開トークンによって識別される操作がすでにoplogから削除されている場合、 Mongo.watch()は変更ストリームを再開できません。
変更ストリームの再開の詳細については、「変更ストリームの再開」を参照してください。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じることがあります。閉じた変更ストリームのカーソルは完全に再開できない場合があります。
注意
再開トークン
16 進数でエンコードされたトークン
16 進数でエンコードされた string の再開トークンを使用すると、再開トークンを比較して並べ替えることができます。変更ストリームを再開するには、BinData 再開トークンまたは 16 進数文字列再開トークンのいずれかを使用します。
MongoDB バージョンで導入された新しい再開トークン形式は、以前のバージョンの MongoDB では使用できません。
再開トークンのデコード
MongoDB に備わる 「スニペット」は、16 進数でエンコードされた再開トークンを解読する mongosh の拡張機能です。
mongosh: から 再開トークン スニペットをインストールして実行できます。
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
システムに npm がインストールされている場合は、コマンドラインから再開トークンを実行することもできます(mongoshを使用せずに)。
npx mongodb-resumetoken-decoder <RESUME TOKEN>
下記についての詳細は、参照先をご覧ください。
アップデート 操作の完全なドキュメント検索
デフォルトでは、変更ストリーム カーソルはアップデート操作における特定のフィールドの変更またはデルタを返します。また、変更されたドキュメントのうち、過半数がコミットした最新のバージョンを検索して返すように変更ストリームを構成できます。アップデートと検索の間に他の書き込み操作が行われた場合、返されるドキュメントがアップデート実行時のドキュメントと大幅に異なる可能性があります。
アップデート操作中に適用された変更の数と完全なドキュメントのサイズによっては、アップデート操作における変更イベント ドキュメントのサイズが BSON ドキュメントの制限である 16 MB を超えるリスクがあります。サイズが超過した場合、サーバーで変更ストリーム カーソルが閉じられ、エラーが返されます。
可用性
変更ストリームは、 "majority"の読み取り保証 (read concern) のサポートに関係なく 使用できます 。つまり、変更ストリームを使用するには、読み取り保証(read concern) majorityサポートを有効にする(デフォルト)か無効にするかを選択できます。
アクセス制御
アクセス制御を使用して実行中の場合、ユーザーは すべてのデータベースにわたるすべての非システムfindchangeStream コレクション に対して および 特権アクションを持っている必要があります。つまり、ユーザーは、次の 特権 を付与する ロール を持っていなければなりません。
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
組み込みの read ロールにより、適切な権限が付与されます。
カーソルの反復
MongoDB には、カーソルを反復処理する方法が複数用意されています。
cursor.hasNext() メソッドの場合、ブロックして次のイベントを待機します。watchCursor カーソルを監視してイベントを反復処理するには、次のように hasNext() を使用します。
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
cursor.tryNext() メソッドはノンブロッキングです。watchCursor カーソルを監視してイベントを反復処理するには、次のように tryNext() を使用します。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例
mongoshの次の操作は、レプリカセット上で変更ストリーム カーソルを開きます。 返されたカーソルは、system データベース、admin データベース、local データベースを除く全データベースにわたるすべてのconfig 以外のコレクションに対するデータ変更を報告します。
watchCursor = db.getMongo().watch()
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
注意
変更ストリームでは isExhausted() を使用できません。