定义
$changeStream返回集合、数据库或整个集群上的变更流游标。 必须用作聚合管道中的第一阶段。
$changeStream阶段采用以下语法:{ $changeStream: { allChangesForCluster: <boolean>, fullDocument: <string>, fullDocumentBeforeChange: <string>, resumeAfter: <document> showExpandedEvents: <boolean>, startAfter: <document> startAtOperationTime: <timestamp> } } Parameter说明allChangesForCluster可选:设置变更流是否应包括集群中的所有更改。只能在
admin数据库中打开。fullDocument可选:指定在通过
update操作执行修改时,变更通知是否包含完整文档的副本。default:更改通知不包含update操作的完整文档。required:更改通知包括更改后立即出现的已修改文档的副本。如果找不到文档,变更流将引发错误。要使用该选项,必须先使用
collMod命令启用changeStreamPreAndPostImages选项。6.0 版本中的新功能。
updateLookup:更改通知包括由更改修改的文档的副本。该文档是当前多数提交文档,如果该多数提交文档不再存在,则为null。whenAvailable: 更改通知包括更改后立即出现的修改文档的副本,如果文档不可用,则为null。要使用该选项,必须先使用
collMod命令启用changeStreamPreAndPostImages选项。6.0 版本中的新功能。
对于部分更新,变更通知还提供变更的描述。
fullDocumentBeforeChange包括更改之前的完整文档。该字段接受以下值:
off:禁止包含更改前的文档。whenAvailable:包含更改前的文档。如果没有未经修改的文档,查询不会失败。required:包括更改之前的文档。如果未修改的文档不可用,则查询将失败。
resumeAfter可选。指定一个恢复令牌作为变更流的逻辑起点。无法用于在
invalidate事件之后恢复变更流。resumeAfter与startAfter和startAtOperationTime互斥。showExpandedEvents指定是否包含其他变更事件,例如 DDL 和索引操作。
6.0 版本中的新功能。
startAfter可选。指定一个恢复令牌作为变更流的逻辑起点。与
resumeAfter不同,startAfter可在出现invalidate事件之后通过创建新的变更流来恢复通知。startAfter与resumeAfter和startAtOperationTime互斥。startAtOperationTime指定一个时间作为变更流的逻辑起点。不能与
resumeAfter或startAfter字段一起使用。
稳定的 API 支持
变更流包含在Stable API V1 中。 但是, Stable APIV 中不包含 showExpandedEvents 选项。1
示例
如需使用聚合阶段创建变更流游标,请运行 aggregate 命令。
var cur = db.names.aggregate( [ { $changeStream: {} } ] )
要打开游标,请运行 cur。
变更流检测到更改时,next() 方法会返回更改事件通知。例如,运行 cur.next() 之后,MongoDB 会返回与以下类似的文档:
{ "_id": { _data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004" }, "operationType": "insert", "clusterTime": Timestamp({ t: 1659039316, i: 2 }), "wallTime": ISODate("2022-07-28T20:15:16.148Z"), "fullDocument": { "_id": ObjectId("62e2ee54c8756c0d5cf6f072"), "name": "Walker Percy" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") } }
要使用MongoDB .NET/ C#驾驶员将 $changeStream 阶段添加到聚合管道,请对 PipelineDefinition对象调用 ChangeStream() 方法。
以下示例创建了一个返回变更流游标的管道阶段:
var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream();
您可以使用ChangeStreamStageOptions对象自定义变更流的行为。以下示例执行与上一示例相同的 $changeStream 操作,但指定了以下选项:
FullDocument选项指定,在通过更新操作进行修改时,变更通知不包含完整文档的副本。StartAtOperationTime选项指定变更流的逻辑点。
var changeStreamOptions = new ChangeStreamStageOptions() { FullDocument = ChangeStreamFullDocumentOption.Default, StartAtOperationTime = new BsonTimestamp(300), }; var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream(changeStreamOptions);
注意
首选 Watch() 方法
如果可能,请使用 IMongoCollection<TDocument>.Watch() 方法而不是此聚合阶段。仅当后续阶段项目掉恢复令牌 (_id) 或者不希望生成的游标自动恢复时,才使用 ChangeStream() 方法。
您可以同时使用 watch() 方法和 aggregate() 方法来执行 $changeStream 操作。当您将聚合管道传递给 MongoDB Collection 对象上的 watch() 方法时,$changeStream 会返回一个 ChangeStreamCursor。当您将聚合管道传递给 aggregate() 方法时,$changeStream 会返回 AggregationCursor。
以下示例创建并执行一个返回 ChangeStreamCursor 的管道:
const pipeline = [{ $changeStream: {} }]; const changeStream = collection.watch(pipeline); return changeStream;
您可以使用ChangeStreamOptions对象自定义变更流的行为。以下示例执行与上一示例相同的 $changeStream 操作,但指定了以下选项:
fullDocument选项指定,如果更新操作修改了文档,则变更通知将包含完整文档的副本。startAtOperationTime选项指定变更流的逻辑点。
const pipeline = [ { $changeStream: { fullDocument: 'updateLookup', startAtOperationTime: 3000 } } ]; const changeStream = collection.watch(pipeline); return changeStream;
了解详情
有关变更流通知的更多信息,请参阅变更事件。
要学习;了解有关相关管道阶段的更多信息,请参阅 $changeStreamSplitLargeEvent 指南。