MongoDB.local SF, Jan 15: See the speaker lineup & ship your AI vision faster. Use WEB50 to save 50%
Find out more >
Docs Menu
Docs Home
/ /
コレクション

db.collection.watch()

db.collection.watch( pipeline, options )

重要

mongosh メソッド

このページでは、mongosh メソッドについて記載しています。ただし、データベースコマンドや Node.js などの言語固有のドライバーのドキュメントには該当しません

データベースコマンドについては、 $changeStream 集計ステージの aggregate コマンドを参照してください。

MongoDB API ドライバーについては、各言語の MongoDB ドライバー ドキュメントを参照してください。

レプリカセットとシャーディングされたクラスターのみ

コレクション上で変更ストリーム カーソルを開きます。

Parameter
タイプ
説明

pipeline

配列

任意。次の集計ステージの1つ以上で構成される集計パイプライン:

変更イベント出力をフィルタリング/修正するためのパイプラインを指定します。

MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。

options

ドキュメント

options ドキュメントには、次のフィールドと値を含めることができます。

フィールド
タイプ
説明

resumeAfter

ドキュメント

任意。 watch()再開トークンで指定された操作の後に再開通知を試行するように に指示します。

各変更ストリームのイベント ドキュメントには、_id フィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの _id フィールド全体を渡します。

resumeAfter は、startAfter および startAtOperationTime と排他関係にあります。

startAfter

ドキュメント

任意。 watch()再開トークンで指定された操作の後に新しい変更ストリームの開始を試行するように に指示します。無効化イベント後に通知を再開できるようにします。

各変更ストリームのイベント ドキュメントには、_id フィールドとして再開トークンが置かれます。変更後に再開する操作を表す変更イベント ドキュメントの _id フィールド全体を渡します。

startAfter は、resumeAfter および startAtOperationTime と排他関係にあります。

fullDocument

string

任意。 デフォルトでは 、アップデートされたドキュメント全体ではなく、watch() アップデート操作によって変更されたフィールドのデルタが返されます。

fullDocument"updateLookup" watch()に設定すると、更新されたドキュメントの過半数がコミットした最新のバージョンを参照するよう に指示します。watch()fullDocumentは、updateDescription デルタに加えて、ドキュメント検索を含む フィールドを返します。

batchSize

整数

任意。MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数を指定します。

cursor.batchSize() と同じ機能を持ちます。

maxAwaitTimeMS

整数

任意。空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。

デフォルトは 1000 ミリ秒です。

collation

ドキュメント

任意。 照合ドキュメントを渡し、変更ストリーム カーソルの照合順序を指定します。

省略する場合は simple バイナリ比較がデフォルトです。

startAtOperationTime

タイムスタンプ

任意。変更ストリームの開始点。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。oplog の時間範囲を確認するには、rs.printReplicationInfo() を参照してください。

startAtOperationTime は、resumeAfter および startAfter と排他関係にあります。

次の値を返します。MongoDB 配置への接続が開いていて、コレクションが存在する限り開いたままのカーソル。変更イベント ドキュメントの例については、「変更イベント」を参照してください。

Tip

このメソッドは、次の環境でホストされている配置で使用できます。

  • MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです

注意

このコマンドは、すべての MongoDB Atlas クラスターでサポートされています。すべてのコマンドに対する Atlas のサポートについては、「サポートされていないコマンド」を参照してください。

  • MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン

  • MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン

db.collection.watch() は次のように、レプリカ セットおよびシャーディングされたクラスターの配置で利用できます。

  • レプリカセットの場合、データを保持している任意のメンバーに対してdb.collection.watch()を発行できます。

  • シャーディングされたクラスターの場合、 インスタンスでdb.collection.watch() mongosを発行する必要があります。

db.collection.watch()Wired Tiger ストレージ エンジンでのみ使用できます。

MongoDB 4.2 以降では、"majority" の読み取り保証 (read concern)のサポートに関係なく変更ストリームが利用可能になりました。つまり、変更ストリームを使用する際、読み取り保証 (read concern) のmajority サポートを有効にする(デフォルト)か無効にするかを選択できます。

MongoDB 4.0 以前では、変更ストリームは、"majority" 読み取り保証(read concern)サポートが有効(デフォルト)の場合にのみ使用できます。

  • db.collection.watch() データを保持しているノードの大半に反映されたデータ変更についてのみ通知します。

  • 変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。

    • カーソルが明示的に閉じている。

    • 無効化イベントが発生している(コレクションの削除や名前の変更など)。

    • MongoDB 配置への接続が閉じているか、タイムアウトになっている。詳細については、「カーソルの動作」を参照してください。

    • 配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じてしまい、閉じた変更ストリームのカーソルが完全に再開できなくなることがあります。

MongoDBドライバーとは異なり、 mongoshはエラー後に変更ストリーム カーソルの再開を自動的に試行しません。 MongoDB ドライバーは、特定のエラーが発生後に 変更ストリーム カーソルの自動的な再開を1 回試行します。

db.collection.watch()は、oplog に保存されている情報を使用して変更イベントの説明を生成し、その操作に関連付けられた再開トークンを生成します。 resumeAfterまたはstartAfterオプションに渡される再開トークンによって識別される操作がすでにoplogから削除されている場合、 db.collection.watch()は変更ストリームを再開できません。

変更ストリームの再開の詳細については、「変更ストリームの再開」を参照してください。

注意

  • 無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、 resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.

  • 配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じてしまい、閉じた変更ストリームのカーソルが完全に再開できなくなることがあります。

注意

無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、 resumeAfterを使用して変更ストリームを再開することはできません。 Instead, you can use startAfter to start a new change stream after an invalidate event.

デフォルトでは、変更ストリーム カーソルはアップデート操作における特定のフィールドの変更またはデルタを返します。また、変更されたドキュメントのうち、過半数がコミットした最新のバージョンを検索して返すように変更ストリームを構成できます。アップデートと検索の間に他の書き込み操作が行われた場合、返されるドキュメントがアップデート実行時のドキュメントと大幅に異なる可能性があります。

アップデート操作中に適用された変更の数と完全なドキュメントのサイズによっては、アップデート操作における変更イベント ドキュメントのサイズが BSON ドキュメントの制限である 16 MB を超えるリスクがあります。サイズが超過した場合、サーバーで変更ストリーム カーソルが閉じられ、エラーが返されます。

アクセス制御を使用して実行中の場合、ユーザーは コレクション リソースに対して find および changeStream の特権アクションを持っている必要があります。つまり、ユーザーは、次の特権を付与するロールを持っていなければなりません。

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }

組み込みの read ロールにより、適切な権限が付与されます。

MongoDB には、カーソルを反復処理する方法が複数用意されています。

cursor.hasNext() メソッドの場合、ブロックして次のイベントを待機します。watchCursor カーソルを監視してイベントを反復処理するには、次のように hasNext() を使用します。

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

cursor.tryNext() メソッドはノンブロッキングです。watchCursor カーソルを監視してイベントを反復処理するには、次のように tryNext() を使用します。

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

次の操作は、 data.sensors コレクションに対し変更ストリーム カーソルを開きます。

watchCursor = db.getSiblingDB("data").sensors.watch()

カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。

注意

変更ストリームでは isExhausted() を使用できません。

fullDocument オプションを "updateLookup" に設定すると、アップデートされた変更ストリーム イベントに関連するドキュメントについて、過半数がコミットした最新のバージョンを参照するよう、変更ストリーム カーソルに指示します。

次の操作は、 fullDocument : "updateLookup"オプションを使用して、data.sensors コレクションに対し変更ストリーム カーソルを開きます。

watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)

カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.tryNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

どのアップデート操作も、変更イベントはドキュメント検索の結果を fullDocument フィールドに返します。

fullDocumentのアップデート出力の例については、「変更ストリーム アップデート イベント」を参照してください。

変更ストリーム出力の詳細なドキュメントについては、 「変更イベント」を参照してください。

注意

MongoDB 4.2 以降では、変更ストリームの集計パイプラインでイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。

次の操作は、集約パイプラインを使用して data.sensors コレクションに対する変更ストリーム カーソルを開き、insert イベントのみをフィルタリングします。

watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)

カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.hasNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。

while (!watchCursor.isClosed()){
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}

変更ストリーム カーソルは、 operationTypeinsert である変更イベントのみを返します。変更ストリームの出力における詳細なドキュメントについては、「変更イベント」を参照してください。

変更ストリーム カーソルによって返されるすべてのドキュメントには、_id フィールドとして再開トークンが含まれます。変更ストリームを再開するには、再開する変更イベントの _id ドキュメント全体を watch()resumeAfter または startAfter オプションに渡します。

次の操作では、再開トークンを使用して data.sensors コレクションに対する変更ストリーム カーソルを再開します。再開トークンを生成した操作が、クラスターの oplog からロール オフされていないことを前提としています。

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
watchCursor.close();
let resumeToken = firstChange._id;
resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)

カーソルを反復処理し、新しいイベントをチェックする。cursor.isClosed() メソッドを cursor.hasNext() メソッドと組み合わせて使用し、変更ストリーム カーソルが閉じられ、かつ最新のバッチにオブジェクトが残っていない場合にのみループが終了するようにします。

while (!resumedWatchCursor.isClosed()){
if (resumedWatchCursor.hasNext()){
print(resumedWatchCursor.next());
}
}

変更ストリームの再開に関する詳細なドキュメントについては、「変更ストリームの再開」を参照してください。

戻る

db.collection.validate