db.collection.watch()
在此页面上
定义
db.collection.watch( pipeline, options )
重要
mongosh 方法
本页面提供
mongosh
方法的相关信息。这不是数据库命令或特定语言驱动程序(例如 Node.js)的相关文档。有关数据库命令,请参阅附带
$changeStream
聚合阶段的aggregate
命令。如需了解 MongoDB API 驱动程序,请参阅特定语言的 MongoDB 驱动程序文档。
仅适用于副本集和分片集群
打开针对某一集合的变更流游标。
Parameter类型说明pipeline
阵列
可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:
指定用于筛选/修改变更事件输出的管道。
options
文档
可选。用于修改
watch()
行为的其他选项。options
文档可包含以下字段和值:字段类型说明resumeAfter
文档
可选。指示
watch()
尝试在恢复令牌中指定的操作之后开始恢复通知。每份变更流事件文档都包含一个恢复令牌作为
_id
字段。传递变更事件文档的整个_id
字段,代表之后想要恢复的操作。resumeAfter
与startAfter
和startAtOperationTime
互斥。startAfter
文档
可选。指示
watch()
在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。每份变更流事件文档都包含一个恢复令牌作为
_id
字段。传递变更事件文档的整个_id
字段,代表之后想要恢复的操作。startAfter
与resumeAfter
和startAtOperationTime
互斥。fullDocument
字符串
fullDocumentBeforeChange
字符串
可选。
从 MongoDB 6.0 开始,您可以使用新的
fullDocumentBeforeChange
字段并将其设置为:"whenAvailable"
在替换、更新或删除文档之前输出文档前像(如果有)。"required"
在替换、更新或删除文档之前输出文档前像。如果前像不可用,则引发错误。"off"
抑制文档前像。"off"
是默认值。
batchSize
int
可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。
功能与
cursor.batchSize()
相同。maxAwaitTimeMS
int
可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。
默认值为
1000
毫秒。collation
文档
可选。传递排序规则文档以指定变更流游标的排序规则。
如果省略,则默认为
simple
二进制比较。showExpandedEvents
布尔
可选。从 MongoDB 6.0 开始,change stream 支持 DDL 事件的变更通知,如 createIndexes 和 dropIndexes 事件。要在 change stream 中包含扩展事件,请使用
showExpandedEvents
选项创建 change stream 游标。6.0 版本中的新功能。
startAtOperationTime
时间戳
可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅
rs.printReplicationInfo()
。startAtOperationTime
与resumeAfter
和startAfter
互斥。返回: 只要与 MongoDB 部署的连接保持打开且集合存在,游标便会保持打开状态。请参阅变更事件,获取变更事件文档的示例。
兼容性
此方法可用于以下环境中托管的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
注意
所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令。
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
可用性
部署
db.collection.watch()
可用于副本集和分片集群部署:
对于副本集,您可在任何承载数据的节点上发出
db.collection.watch()
。对于分片集群,您必须在一个
mongos
实例上发出db.collection.watch()
。
引擎加密
只能将 db.collection.watch()
与 Wired Tiger 存储引擎一起使用。
读关注majority
支持
无论是否支持"majority"
读关注(read concern), 变更流 都可用;也就是说,可以启用(默认)或majority
禁用 读关注(read concern) 支持,以使用变更流。
行为
db.collection.watch()
仅通知持续到大多数数据承载节点的数据更改。变更流游标保持打开状态,直到出现以下任一情况:
可恢复性
与 MongoDB 驱动程序不同,mongosh
不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次。
db.collection.watch()
使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfter
或 startAfter
选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.collection.watch()
无法恢复变更流。
有关恢复变更流的更多信息,请参阅恢复变更流。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
更新操作的完整文件查询
默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。
根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。
访问控制
使用访问控制来运行时,用户必须对集合资源具有 find
和 changeStream
特权动作。换言之,用户必须具有能授予以下特权的角色:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
内置 read
角色提供了相应的特权。
游标迭代
MongoDB 提供多种在游标上迭代的方法。
cursor.hasNext()
方法会阻塞并等待下一个事件。要监控 watchCursor
游标并迭代事件,请使用 hasNext()
,如下所示:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
该 cursor.tryNext()
方法为非阻塞式。要监控 watchCursor
游标并迭代事件,请使用 tryNext()
,如下所示:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
示例
打开变更流
以下操作针对 data.sensors
集合打开 change stream 游标:
watchCursor = db.getSiblingDB("data").sensors.watch()
迭代游标以检查是否存在新事件。使用 cursor.isClosed()
方法和 cursor.tryNext()
方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
如需变更流输出的完整文档,请参阅变更事件。
注意
不能将 isExhausted()
与变更流结合使用。
通过完整文档更新查找 Change Stream
将 fullDocument
选项设置为 "updateLookup"
,从而指示变更流游标查找与更新变更流事件关联的文档的最新多数提交版本。
以下操作使用fullDocument : "updateLookup"
选项打开针对data.sensors
集合的 change stream 游标。
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
迭代游标以检查是否存在新事件。使用 cursor.isClosed()
方法和 cursor.tryNext()
方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
对于任何更新操作,变更事件都会在 fullDocument
字段中返回文档查找的结果。
有关完整文档更新输出的示例,请参阅变更流更新事件。
如需变更流输出的完整文档,请参阅变更事件。
附带文档前映像和后映像的变更流
从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):
前映像是指被替换、更新或删除之前的文档。已插入的文档没有前映像。
后图像是插入、替换或更新后的文档。 已删除的文档没有后图像。
使用
db.createCollection()
、create
或collMod
为集合启用changeStreamPreAndPostImages
。
如果图像属于以下情况,则前像和后像不可用于变更流事件:
在文档更新或删除操作时未对集合启用。
在
expireAfterSeconds
中设置的前像和后像保留时间后之后被删除。以下示例将整个集群上的
expireAfterSeconds
设置为100
秒:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 以下示例返回当前的
changeStreamOptions
设置,包括expireAfterSeconds
:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) 将
expireAfterSeconds
设置为off
可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。如果变更流事件从 oplog 中删除,则无论
expireAfterSeconds
前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。
其他考量:
启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。
将变更流事件大小限制为小于 16 MB。要限制事件大小,您可以:
将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如
updateDescription
)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。如果其他变更流事件字段(例如
updateDescription
)并不大,则仅请求变更流输出中最多 16 MB 的文档的后像。在以下情况下,仅请求最大 16 MB 的文档的 change stream 输出中的预映像:
文档更新仅影响文档结构或内容的一小部分,且
不会引起
replace
变更事件。replace
事件始终包含后像。
如要请求前图像,请在
db.collection.watch()
中将fullDocumentBeforeChange
设置为required
或whenAvailable
。如要请求后图像,请使用相同方法设置fullDocument
。前像被写入
config.system.preimages
集合。config.system.preimages
集合可能会变大。要限制集合大小,可如前文所示为前映像设置expireAfterSeconds
时间。前像由后台进程异步删除。
重要
向后不兼容的功能
从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod
命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。
创建集合
创建启用 changeStreamPreAndPostImages 的 temperatureSensor
集合:
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
用温度读数填充 temperatureSensor
集合:
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
以下部分显示了使用 temperatureSensor
集合的文档前映像和后映像的 change stream 示例。
使用文档前图像变更流
使用 fullDocumentBeforeChange: "whenAvailable"
设置输出文档前像(如有)。前像是指被替换、更新或删除之前的文档。插入的文档没有前像。
以下示例使用 fullDocumentBeforeChange:
"whenAvailable"
为 temperatureSensor
集合创建变更流游标:
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
以下示例使用游标检查新的变更流事件:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
在示例中:
while
循环将一直运行,直到游标关闭。如果该游标包含文档,则
hasNext()
返回true
。
以下示例更新temperatureSensor
文档的reading
字段:
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
更新 temperatureSensor
文档后,变更事件会在 fullDocumentBeforeChange
字段中输出文档前像。前像包含更新之前的 temperatureSensor
文档reading
字段。例如:
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
有文档后像的变更流
使用 fullDocument: "whenAvailable"
设置输出文档后像(如果可用)。后像是指插入、替换或更新后的文档。已删除的文档没有后像。
以下示例使用 fullDocument:
"whenAvailable"
为 temperatureSensor
集合创建变更流游标:
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
以下示例使用游标检查新的变更流事件:
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
在示例中:
while
循环将一直运行,直到游标关闭。如果该游标包含文档,则
hasNext()
返回true
。
以下示例更新temperatureSensor
文档的reading
字段:
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
更新 temperatureSensor
文档后,变更事件会在 fullDocument
字段中输出文档后图像。后图像包含更新之后的 temperatureSensor
文档reading
字段。例如:
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
使用 Aggregation Pipeline 筛选器的 Change Stream
以下操作使用聚合管道打开针对 data.sensors
集合的变更流游标,从而仅筛选 insert
事件:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
迭代游标以检查是否存在新事件。使用 cursor.isClosed()
方法和 cursor.hasNext()
方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
变更流游标仅返回 operationType
为 insert
的变更事件。如需变更流输出的完整文档,请参阅变更事件。
恢复 Change Stream
变更流游标返回的每份文档均包含一个恢复令牌作为 _id
字段。要恢复变更流,请将要恢复的变更事件的整个 _id
文档传递给 watch()
的 resumeAfter
或 startAfter
选项。
以下操作使用恢复令牌恢复针对 data.sensors
集合的变更流游标。这个示例假设生成恢复令牌的操作尚未从集群的 oplog 中删除。
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
迭代游标以检查是否存在新事件。使用 cursor.isClosed()
方法和 cursor.hasNext()
方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
请参阅恢复变更流,获取恢复变更流的完整文档。