定義
db.collection.watch( pipeline, options )重要
mongosh メソッド
このページでは、
mongoshメソッドについて記載しています。ただし、データベースコマンドや Node.js などの言語固有のドライバーのドキュメントには該当しません。データベースコマンドについては、
$changeStream集計ステージのaggregateコマンドを参照してください。MongoDB API ドライバーについては、各言語の MongoDB ドライバー ドキュメントを参照してください。
レプリカセットとシャーディングされたクラスターのみ
コレクション上で変更ストリーム カーソルを開きます。
Parameterタイプ説明pipeline配列
任意。次の集計ステージの1つ以上で構成される集計パイプライン:
変更イベント出力をフィルタリング/修正するためのパイプラインを指定します。
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
optionsドキュメント
optionsドキュメントには、次のフィールドと値を含めることができます。フィールドタイプ説明resumeAfterドキュメント
任意。
watch()再開トークンで指定された操作の後に再開通知を試行するように に指示します。各変更ストリームのイベント ドキュメントには、
_idフィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_idフィールド全体を渡します。resumeAfterは、startAfterおよびstartAtOperationTimeと排他関係にあります。startAfterドキュメント
任意。
watch()再開トークンで指定された操作の後に新しい変更ストリームの開始を試行するように に指示します。無効化イベント後に通知を再開できるようにします。各変更ストリームのイベント ドキュメントには、
_idフィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの_idフィールド全体を渡します。startAfterは、resumeAfterおよびstartAtOperationTimeと排他関係にあります。fullDocumentstring
batchSize整数
任意。MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数を指定します。
cursor.batchSize()と同じ機能を持ちます。maxAwaitTimeMS整数
任意。空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。
デフォルトは
1000ミリ秒です。collationドキュメント
startAtOperationTimeタイムスタンプ
任意。変更ストリームの開始点。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。oplog の時間範囲を確認するには、
rs.printReplicationInfo()を参照してください。startAtOperationTimeは、resumeAfterおよびstartAfterと排他関係にあります。次の値を返します。 MongoDB 配置への接続が開いていて、コレクションが存在する限り開いたままのカーソル。変更イベント ドキュメントの例については、「変更イベント」を参照してください。 Tip
互換性
このメソッドは、次の環境でホストされている配置で使用できます。
MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです
注意
このコマンドは、すべての MongoDB Atlas クラスターでサポートされています。すべてのコマンドに対する Atlas のサポートについては、「サポートされていないコマンド」を参照してください。
MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン
MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン
可用性
配置
db.collection.watch() は次のように、レプリカ セットおよびシャーディングされたクラスターの配置で利用できます。
レプリカセットの場合、データを保持している任意のメンバーに対して
db.collection.watch()を発行できます。シャーディングされたクラスターの場合、 インスタンスで
db.collection.watch()mongosを発行する必要があります。
ストレージ エンジン
読み取り保証 (read concern)majority サポート
MongoDB 4.2 以降では、"majority" の読み取り保証 (read concern)のサポートに関係なく変更ストリームが利用可能になりました。つまり、変更ストリームを使用する際、読み取り保証 (read concern) のmajority サポートを有効にする(デフォルト)か無効にするかを選択できます。
MongoDB 4.0 以前では、変更ストリームは、"majority" 読み取り保証(read concern)サポートが有効(デフォルト)の場合にのみ使用できます。
動作
db.collection.watch()データを保持しているノードの大半に反映されたデータ変更についてのみ通知します。変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
再開可能性
MongoDBドライバーとは異なり、 mongoshはエラー後に変更ストリーム カーソルの再開を自動的に試行しません。 MongoDB ドライバーは、特定のエラーが発生後に 変更ストリーム カーソルの自動的な再開を1 回試行します。
db.collection.watch()は、oplog に保存されている情報を使用して変更イベントの説明を生成し、その操作に関連付けられた再開トークンを生成します。 resumeAfterまたはstartAfterオプションに渡される再開トークンによって識別される操作がすでにoplogから削除されている場合、 db.collection.watch()は変更ストリームを再開できません。
変更ストリームの再開の詳細については、「変更ストリームの再開」を参照してください。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じてしまい、閉じた変更ストリームのカーソルが完全に再開できなくなることがあります。
注意
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、 resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.
アップデート 操作の完全なドキュメント検索
デフォルトでは、変更ストリーム カーソルはアップデート操作における特定のフィールドの変更またはデルタを返します。また、変更されたドキュメントのうち、過半数がコミットした最新のバージョンを検索して返すように変更ストリームを構成できます。アップデートと検索の間に他の書き込み操作が行われた場合、返されるドキュメントがアップデート実行時のドキュメントと大幅に異なる可能性があります。
アップデート操作中に適用された変更の数と完全なドキュメントのサイズによっては、アップデート操作における変更イベント ドキュメントのサイズが BSON ドキュメントの制限である 16 MB を超えるリスクがあります。サイズが超過した場合、サーバーで変更ストリーム カーソルが閉じられ、エラーが返されます。
アクセス制御
アクセス制御を使用して実行中の場合、ユーザーは コレクション リソースに対して find および changeStream の特権アクションを持っている必要があります。つまり、ユーザーは、次の特権を付与するロールを持っていなければなりません。
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
組み込みの read ロールにより、適切な権限が付与されます。
カーソルの反復
MongoDB には、カーソルを反復処理する方法が複数用意されています。
cursor.hasNext() メソッドの場合、ブロックして次のイベントを待機します。watchCursor カーソルを監視してイベントを反復処理するには、次のように hasNext() を使用します。
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
cursor.tryNext() メソッドはノンブロッキングです。watchCursor カーソルを監視してイベントを反復処理するには、次のように tryNext() を使用します。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例
変更ストリームを開く
次の操作は、 data.sensors コレクションに対し変更ストリーム カーソルを開きます。
watchCursor = db.getSiblingDB("data").sensors.watch()
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
注意
変更ストリームでは isExhausted() を使用できません。
変更ストリームで updateLookup に fullDocument オプションを使用する
fullDocument オプションを "updateLookup" に設定すると、アップデートされた変更ストリーム イベントに関連するドキュメントについて、過半数がコミットした最新のバージョンを参照するよう、変更ストリーム カーソルに指示します。
次の操作は、 fullDocument : "updateLookup"オプションを使用して、data.sensors コレクションに対し変更ストリーム カーソルを開きます。
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
どのアップデート操作も、変更イベントはドキュメント検索の結果を fullDocument フィールドに返します。
fullDocumentのアップデート出力の例については、「変更ストリーム アップデート イベント」を参照してください。
変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。
変更ストリームで集約パイプライン フィルターを使用する
注意
MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
次の操作は、集約パイプラインを使用して data.sensors コレクションに対する変更ストリーム カーソルを開き、insert イベントのみをフィルタリングします。
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.hasNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
変更ストリーム カーソルは、 operationType が insert である変更イベントのみを返します。変更ストリームの出力における詳細なドキュメントについては、「変更イベント」を参照してください。
変更ストリームの再開
変更ストリーム カーソルによって返されるすべてのドキュメントには、_id フィールドとして再開トークンが含まれます。変更ストリームを再開するには、再開する変更イベントの _id ドキュメント全体を watch() の resumeAfter または startAfter オプションに渡します。
次の操作では、再開トークンを使用して data.sensors コレクションに対する変更ストリーム カーソルを再開します。再開トークンを生成した操作が、クラスターの oplog からロール オフされていないことを前提としています。
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.hasNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
変更ストリームの再開に関する詳細なドキュメントについては、「変更ストリームの再開」を参照してください。