定義
$changeStreamコレクション、データベース、またはクラスター全体に対して変更ストリームカーソルを返します。 集計パイプラインの最初のステージとして使用する必要があります。
$changeStreamステージの構文は次のとおりです。{ $changeStream: { allChangesForCluster: <boolean>, fullDocument: <string>, fullDocumentBeforeChange: <string>, resumeAfter: <document> showExpandedEvents: <boolean>, startAfter: <document> startAtOperationTime: <timestamp> } } Parameter説明allChangesForCluster任意: 変更ストリームにクラスター内のすべての変更を含めるかどうかを設定します。
adminデータベースでのみ開くことができます。fullDocument任意:
update操作によって変更された場合、変更通知に完全なドキュメントのコピーを含めるかどうかを指定します。default: 変更通知には、update操作の完全なドキュメントは含まれません。required: 変更通知には、変更の直後に表示される変更されたドキュメントのコピーが含まれます。 ドキュメントが見つからない場合、変更ストリームはエラーをスローします。このオプションを使用するには、まず
collModコマンドを使用してchangeStreamPreAndPostImagesオプションを有効にする必要があります。バージョン 6.0 で追加。
updateLookup: 変更通知には、変更によって変更されたドキュメントのコピーが含まれます。 このドキュメントは、現在の過半数がコミットしたドキュメントであるか、存在しない場合はnullです。whenAvailable: 変更通知には、変更の直後に表示されるように変更されたドキュメントのコピーが含まれます。ドキュメントが利用できない場合はnullです。このオプションを使用するには、まず
collModコマンドを使用してchangeStreamPreAndPostImagesオプションを有効にする必要があります。バージョン 6.0 で追加。
部分的な更新の場合は、変更通知にも変更の説明が記載されます。
fullDocumentBeforeChange変更前の完全なドキュメントを含めます。 このフィールドは、次の値を受け入れます。
off: 変更前のドキュメントを含めることを無効にします。whenAvailable: 変更前のドキュメントが含まれます。 変更されていないドキュメントが利用できない場合、クエリは失敗しません。required: 変更前のドキュメントが含まれます。 変更されていないドキュメントが利用できない場合、クエリは失敗します。
resumeAfter任意。 変更ストリームの論理的な開始点として再開トークンを指定します。
invalidateイベント後に変更ストリームを再開するために使用することはできません。resumeAfterは、startAfterおよびstartAtOperationTimeと排他関係にあります。showExpandedEventsDDL やインデックス操作などの追加の変更イベントを含めるかどうかを指定します。
バージョン 6.0 で追加。
startAfter任意。 変更ストリームの論理的な開始点として再開トークンを指定します。
resumeAfterとは異なり、startAfterは新しい変更ストリームを作成することで、invalidateイベント後に通知を再開できます。startAfterは、resumeAfterおよびstartAtOperationTimeと排他関係にあります。startAtOperationTime変更ストリームの論理的な開始点として時間を指定します。
resumeAfterまたはstartAfterフィールドとは併用できません。
Stable API でのサポート
変更ストリームはStable API V 1に含まれています。 ただし、 showExpandedEventsオプションは Stable API V 1に含まれていません。
例
集計ステージを使用して変更ストリーム カーソルを作成するには、 aggregateコマンドを実行します。
var cur = db.names.aggregate( [ { $changeStream: {} } ] )
カーソルを開くには、 curを実行します。
変更ストリームが変更を検出すると、 next()メソッドは変更イベント通知を返します。 たとえば、 cur.next()の実行後、MongoDB は次のようなドキュメントを返します。
{ "_id": { _data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004" }, "operationType": "insert", "clusterTime": Timestamp({ t: 1659039316, i: 2 }), "wallTime": ISODate("2022-07-28T20:15:16.148Z"), "fullDocument": { "_id": ObjectId("62e2ee54c8756c0d5cf6f072"), "name": "Walker Percy" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") } }
MongoDB .NET/ C#ドライバーを使用して$changeStream ステージを集計パイプラインに追加するには、 PipelineDefinitionオブジェクトで ChangeStream() メソッドを呼び出します。
次の例では、変更ストリームカーソルを返すパイプラインステージを作成しています。
var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream();
ChangeStreamStageOptionsオブジェクトを使用して、変更ストリームの動作をカスタマイズできます。次の例では、前の例と同じ $changeStream操作を実行しますが、次のオプションを指定しています。
FullDocumentオプションは、アップデート操作によって変更された場合に、変更通知に完全なドキュメントのコピーを含めないように指定します。StartAtOperationTimeオプションは、変更ストリームの論理的な開始点を指定します。
var changeStreamOptions = new ChangeStreamStageOptions() { FullDocument = ChangeStreamFullDocumentOption.Default, StartAtOperationTime = new BsonTimestamp(300), }; var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream(changeStreamOptions);
注意
Watch() メソッドを使用する
可能な場合は、この集計ステージの代わりに IMongoCollection<TDocument>.Watch() メソッドを使用します。後続のステージで再開トークン(_id)がプロジェクト場合、または結果のカーソルを自動的に再開したくない場合にのみ、ChangeStream() メソッドを使用します。
watch() メソッドと aggregate() メソッドの両方を使用して、 $changeStream操作を実行できます。MongoDB Collectionオブジェクトの watch() メソッドに集計パイプラインを渡すと、$changeStream は ChangeStreamCursor を返します。集計パイプラインを aggregate() メソッドに渡すと、$changeStream から が返されます。AggregationCursor
重要
$changeStream の再開可能性
変更ストリームを aggregate() メソッドに渡すと、変更ストリームは再開できません。変更ストリームは、 watch() メソッドに渡された場合にのみ再開します。再開可能性の詳細については、変更ストリームの再開を参照してください。
次の例では、ChangeStreamCursor を返すパイプラインを作成して実行します。
const pipeline = [{ $changeStream: {} }]; const changeStream = collection.watch(pipeline); return changeStream;
ChangeStreamOptionsオブジェクトを使用して、変更ストリームの動作をカスタマイズできます。次の例では、前の例と同じ $changeStream操作を実行しますが、次のオプションを指定しています。
fullDocumentオプションは、アップデート操作によってドキュメントが変更される場合、変更通知に完全なドキュメントのコピーが含まれることを指定します。startAtOperationTimeオプションは、変更ストリームの論理的な開始点を指定します。
const pipeline = [ { $changeStream: { fullDocument: 'updateLookup', startAtOperationTime: 3000 } } ]; const changeStream = collection.watch(pipeline); return changeStream;
詳細
変更ストリーム通知の詳細については、「変更イベント 」を参照してください。
関連するパイプラインステージの詳細については、$changeStreamSplitLargeEventガイドを参照してください。