定义
db.collection.watch( pipeline, options )重要
mongosh 方法
本页面提供
mongosh方法的相关信息。这不是数据库命令或特定语言驱动程序(例如 Node.js)的相关文档。有关数据库命令,请参阅附带
$changeStream聚合阶段的aggregate命令。如需了解 MongoDB API 驱动程序,请参阅特定语言的 MongoDB 驱动程序文档。
仅适用于副本集和分片集群
打开针对某一集合的变更流游标。
Parameter类型说明pipeline阵列
可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:
指定用于筛选/修改变更事件输出的管道。
options文档
可选。 用于修改
watch()行为的其他选项。options文档可包含以下字段和值:字段类型说明resumeAfter文档
可选。 指示
watch()尝试在恢复令牌中指定的操作之后开始恢复通知。每份变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id字段,代表之后想要恢复的操作。resumeAfter与startAfter和startAtOperationTime互斥。startAfter文档
可选。 指示
watch()在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。每份变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id字段,代表之后想要恢复的操作。startAfter与resumeAfter和startAtOperationTime互斥。fullDocument字符串
batchSizeint
可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。
功能与
cursor.batchSize()相同。maxAwaitTimeMSint
可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。
默认值为
1000毫秒。collation文档
可选。 传递collation 文档,为 change stream 游标指定排序规则。
如果省略,则默认为
simple二进制比较。startAtOperationTime时间戳
可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅
rs.printReplicationInfo()。startAtOperationTime与resumeAfter和startAfter互斥。返回: 只要与 MongoDB 部署的连接保持打开且集合存在,游标便会保持打开状态。请参阅变更事件,获取变更事件文档的示例。
兼容性
此方法可用于以下环境中托管的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
注意
所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令。
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
可用性
部署
db.collection.watch() 可用于副本集和分片集群部署:
对于副本集,您可在任何承载数据的节点上发出
db.collection.watch()。对于分片集群,您必须在一个
mongos实例上发出db.collection.watch()。
引擎加密
只能将 db.collection.watch() 与 Wired Tiger 存储引擎一起使用。
读关注majority 支持
从 MongoDB 4.2 开始,无论是否有 "majority" 读关注支持,change stream 都可用;也就是说,读关注 majority 支持可以启用(默认),也可以禁用以使用 change stream。
在 MongoDB 4.0 及更早版本中,仅当启用 "majority" 读关注支持时,才能使用变更流。
行为
db.collection.watch()仅通知持续到大多数数据承载节点的数据更改。变更流游标保持打开状态,直到出现以下任一情况:
可恢复性
与 MongoDB 驱动程序不同,mongosh 不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次。
db.collection.watch() 使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfter 或 startAfter 选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.collection.watch() 无法恢复变更流。
有关恢复变更流的更多信息,请参阅恢复变更流。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
更新操作的完整文件查询
默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。
根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。
访问控制
使用访问控制来运行时,用户必须对集合资源具有 find 和 changeStream 特权动作。换言之,用户必须具有能授予以下特权的角色:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
内置 read 角色提供了相应的特权。
游标迭代
MongoDB 提供多种在游标上迭代的方法。
cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
该 cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
示例
打开变更流
以下操作针对 data.sensors 集合打开 change stream 游标:
watchCursor = db.getSiblingDB("data").sensors.watch()
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
如需变更流输出的完整文档,请参阅变更事件。
注意
不能将 isExhausted() 与变更流结合使用。
通过完整文档更新查找 Change Stream
将 fullDocument 选项设置为 "updateLookup",从而指示变更流游标查找与更新变更流事件关联的文档的最新多数提交版本。
以下操作使用fullDocument : "updateLookup"选项打开针对data.sensors集合的 change stream 游标。
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
对于任何更新操作,变更事件都会在 fullDocument 字段中返回文档查找的结果。
有关完整文档更新输出的示例,请参阅变更流更新事件。
如需变更流输出的完整文档,请参阅变更事件。
使用 Aggregation Pipeline 筛选器的 Change Stream
以下操作使用聚合管道打开针对 data.sensors 集合的变更流游标,从而仅筛选 insert 事件:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
变更流游标仅返回 operationType 为 insert 的变更事件。如需变更流输出的完整文档,请参阅变更事件。
恢复 Change Stream
变更流游标返回的每份文档均包含一个恢复令牌作为 _id 字段。要恢复变更流,请将要恢复的变更事件的整个 _id 文档传递给 watch() 的 resumeAfter 或 startAfter 选项。
以下操作使用恢复令牌恢复针对 data.sensors 集合的变更流游标。这个示例假设生成恢复令牌的操作尚未从集群的 oplog 中删除。
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
请参阅恢复变更流,获取恢复变更流的完整文档。