Docs 菜单

Docs 主页开发应用程序MongoDB Manual

db.watch()

在此页面上

  • 定义
  • 可用性
  • 部署
  • 引擎加密
  • 读关注 majority 支持
  • 行为
  • 可恢复性
  • 更新操作的完整文件查询
  • 访问控制
  • 游标迭代
  • 例子
db.watch( pipeline, options )

仅适用于副本集和分片集群

打开变更流游标,以便数据库报告其所有非 system 集合的情况。

范围
类型
说明
pipeline
阵列

可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:

指定用于筛选/修改变更事件输出的管道。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

options
文档
可选。用于修改 db.watch()行为的其他选项。

options 文档可包含以下字段和值:

字段
类型
说明
resumeAfter
文档

可选。指示db.watch()尝试在恢复令牌中指定的操作之后开始恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

resumeAfterstartAfterstartAtOperationTime 互斥。

startAfter
文档

可选。指示db.watch()在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

startAfterresumeAfterstartAtOperationTime 互斥。

4.2 版本中的新增功能

fullDocument
字符串

可选。默认情况下, db.watch()返回通过更新操作修改的字段的增量,而不是整个更新后的文档。

fullDocument设置为"updateLookup" ,以指示db.watch()查找更新文档的最新多数提交版本。 db.watch()除了updateDescription增量之外,还会返回一个包含文档查找的fullDocument字段。

从 MongoDB 6.0 开始,您可将 fullDocument 设为:

  • "whenAvailable" 在插入、替换或更新文档之后输出文档后像(如果有)。

  • "required" 在插入、替换或更新文档之后输出文档后像。如果后像不可用,则引发错误。

fullDocumentBeforeChange
字符串

可选。默认值为 "off"

从 MongoDB 6.0 开始,您可以使用新的 fullDocumentBeforeChange 字段并将其设置为:

  • "whenAvailable" 在替换、更新或删除文档之前输出文档前像(如果有)。

  • "required" 在替换、更新或删除文档之前输出文档前像。如果前像不可用,则引发错误。

  • "off" 抑制文档前像。"off" 是默认值。

batchSize
int

可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。

功能与 cursor.batchSize() 相同。

maxAwaitTimeMS
int

可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000 毫秒。

collation
文档

可选。传递排序规则文档以指定变更流游标的排序规则。

如果省略,则默认为 simple 二进制比较。

startAtOperationTime
时间戳

可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅 rs.printReplicationInfo()

startAtOperationTimeresumeAfterstartAfter 互斥。

返回:游标位于更改事件文档上。有关更改事件文档的示例,请参阅更改事件

提示

另请参阅:

db.watch() 适用于副本集和分片集群:

  • 对于副本集,您可以在任何数据承载节点上发出db.watch()

  • 对于分片集群,您必须在 实例上发出db.watch() mongos

您只能将db.watch()Wired Tiger 存储引擎一起使用。

从 MongoDB 4.2 开始,无论是否有 "majority" 读关注支持,change stream 都可用;也就是说,读关注 majority 支持可以启用(默认),也可以禁用以使用 change stream。

在 MongoDB 4.0 及更早版本中,仅当启用 "majority" 读关注支持时,才能使用变更流

  • 不能在adminlocalconfig数据库上运行db.watch()

  • db.watch() 仅通知持续到大多数数据承载节点的数据更改。

  • 变更流游标保持打开状态,直到出现以下任一情况:

    • 游标已明确关闭。

    • 发生失效事件;例如删除或重命名集合。

    • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

    • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

  • 您可以为不存在的数据库运行db.watch() 。但是,一旦创建了数据库并且删除了该数据库,变更流游标就会关闭。

与 MongoDB 驱动程序不同, mongosh不会在出错后自动尝试恢复变更流游标。 MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标。

db.watch()使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给resumeAfterstartAfter选项的恢复令牌标识的操作已经删除oplog ,则db.watch()无法恢复变更流。

有关恢复变更流的更多信息,请参阅恢复变更流

注意

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。从 MongoDB 4.2 开始,可使用 StartAfter无效事件后启动新的变更流。

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

注意

在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。从 MongoDB 4.2 开始,可使用 StartAfter无效事件后启动新的变更流。

默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。

根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。

使用访问控制来运行时,用户必须对数据库资源具有 findchangeStream 特权动作。换言之,用户必须具有能授予以下特权角色

{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream"] }

内置 read 角色提供了相应的特权。

MongoDB 提供多种在游标上迭代的方法。

cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

mongosh中的以下操作在hr数据库上打开变更流游标。返回的游标报告该数据库中所有非system集合的数据更改。

watchCursor = db.getSiblingDB("hr").watch()

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

如需变更流输出的完整文档,请参阅变更事件。

注意

不能将 isExhausted()变更流结合使用。

← db.version()