定义
db.watch( pipeline, options )仅适用于副本集和分片集群
打开数据库的变更流游标,以报告其所有非
system集合的信息。Parameter类型说明pipeline阵列
可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:
指定用于筛选/修改变更事件输出的管道。
options文档
可选。 用于修改
db.watch()行为的其他选项。options文档可包含以下字段和值:字段类型说明resumeAfter文档
可选。指定一个恢复令牌作为变更流的逻辑起点。无法用于在
invalidate事件之后恢复变更流。resumeAfter与startAfter和startAtOperationTime互斥。startAfter文档
可选。指定一个恢复令牌作为变更流的逻辑起点。与
resumeAfter不同,startAfter可在出现invalidate事件之后通过创建新的变更流来恢复通知。startAfter与resumeAfter和startAtOperationTime互斥。fullDocument字符串
可选。 默认情况下,
db.watch()返回通过更新操作修改的字段的增量,而不是整个更新后的文档。将 设置为
fullDocument"updateLookup"db.watch(),以指示 查找更新文档的最新多数提交版本。db.watch()fullDocument除了 增量外,还会返回一个包含文档查找的updateDescription字段。从 MongoDB 6.0 开始,您可将
fullDocument设为:"whenAvailable"在插入、替换或更新文档之后输出文档后像(如果有)。"required"在插入、替换或更新文档之后输出文档后像。如果后像不可用,则引发错误。
fullDocumentBeforeChange字符串
可选。默认值为
"off"。从 MongoDB 6.0 开始,您可以使用新的
fullDocumentBeforeChange字段并将其设置为:"whenAvailable"在替换、更新或删除文档之前输出文档前像(如果有)。"required"在替换、更新或删除文档之前输出文档前像。如果前像不可用,则引发错误。"off"抑制文档前像。"off"是默认值。
batchSizeint
可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。
功能与
cursor.batchSize()相同。maxAwaitTimeMSint
可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。
默认值为
1000毫秒。collation文档
可选。传递排序规则文档以指定变更流游标的排序规则。
如果省略,则默认为
simple二进制比较。startAtOperationTime时间戳
可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅
rs.printReplicationInfo()。startAtOperationTime与resumeAfter和startAfter互斥。返回: 游标位于更改事件文档上。有关更改事件文档的示例,请参阅更改事件。
兼容性
此方法可用于以下环境中托管的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
注意
所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令。
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
可用性
部署
db.watch() 适用于副本集和分片集群:
对于副本集,您可在任何承载数据的节点上发出
db.watch()。对于分片集群,您必须在一个
mongos实例上发出db.watch()。
引擎加密
只能将 db.watch() 与 Wired Tiger 存储引擎一起使用。
读关注majority 支持
无论是否支持"majority" 读关注(read concern), 变更流 都可用;也就是说,可以启用(默认)或majority 禁用 读关注(read concern) 支持,以使用变更流。
行为
您不能在
admin、local或config数据库中运行db.watch()。db.watch()仅通知持续到大多数数据承载节点的数据更改。变更流游标保持打开状态,直到出现以下任一情况:
游标已明确关闭。
发生失效事件;例如删除或重命名集合。
如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
您可以为不存在的数据库运行
db.watch()。但是,一旦创建了数据库并且删除了该数据库,变更流游标就会关闭。
可恢复性
与 MongoDB 驱动程序不同,mongosh 不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次。
db.watch() 使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfter 或 startAfter 选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.watch() 无法恢复变更流。
有关恢复变更流的更多信息,请参阅恢复变更流。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
更新操作的完整文件查询
默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。
根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。
访问控制
使用访问控制来运行时,用户必须对数据库资源具有 find 和 changeStream 特权动作。换言之,用户必须具有能授予以下特权的角色:
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream"] }
内置 read 角色提供了相应的特权。
游标迭代
MongoDB 提供多种在游标上迭代的方法。
cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
该 cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例子
mongosh 中的以下操作在 hr 数据库中打开变更流游标。返回的游标报告该数据库中所有非 system 集合的数据变更。
watchCursor = db.getSiblingDB("hr").watch()
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
如需变更流输出的完整文档,请参阅变更事件。
注意
不能将 isExhausted() 与变更流结合使用。