Docs 菜单

Docs 主页开发应用程序MongoDB Manual

Mongo.watch()

在此页面上

  • 定义
  • 可用性
  • 部署
  • 引擎加密
  • 读关注 majority 支持
  • 行为
  • 可恢复性
  • 更新操作的完整文件查询
  • 可用性
  • 访问控制
  • 游标迭代
  • 例子
Mongo.watch( pipeline, options )

仅适用于副本集和分片集群

为副本集或分片集群打开变更流游标,可报告其数据库中的所有非 system 集合,adminlocalconfig 数据库除外。

范围
类型
说明
pipeline
阵列

可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:

指定用于筛选/修改变更事件输出的管道。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

options
文档
可选。用于修改 Mongo.watch()行为的其他选项。

options 文档可包含以下字段和值:

字段
类型
说明
resumeAfter
文档

可选。指示Mongo.watch()尝试在恢复令牌中指定的操作之后开始恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

resumeAfterstartAfterstartAtOperationTime 互斥。

startAfter
文档

可选。指示Mongo.watch()在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

startAfterresumeAfterstartAtOperationTime 互斥。

4.2 版本中的新增功能

fullDocument
字符串

可选。默认情况下, Mongo.watch()返回通过更新操作修改的字段的增量,而不是整个更新后的文档。

fullDocument设置为"updateLookup" ,以指示Mongo.watch()查找更新文档的最新多数提交版本。 Mongo.watch()除了updateDescription增量之外,还会返回一个包含文档查找的fullDocument字段。

batchSize
int

可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。

功能与 cursor.batchSize() 相同。

maxAwaitTimeMS
int

可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000 毫秒。

collation
文档

可选。传递排序规则文档以指定变更流游标的排序规则。

如果省略,则默认为 simple 二进制比较。

startAtOperationTime
时间戳

可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅 rs.printReplicationInfo()

startAtOperationTimeresumeAfterstartAfter 互斥。

返回:游标位于更改事件文档上。有关更改事件文档的示例,请参阅更改事件

提示

另请参阅:

Mongo.watch() 适用于副本集和分片集群:

您只能将Mongo.watch()Wired Tiger 存储引擎一起使用。

无论是否支持"majority" 读关注, 变更流 都可用;也就是说,可以启用(默认)或majority 禁用 读关注 支持,以使用变更流。

  • Mongo.watch() 仅通知持续到大多数数据承载节点的数据更改。

  • 变更流游标保持打开状态,直到出现以下任一情况:

    • 游标已明确关闭。

    • 发生失效事件;例如删除或重命名集合。

    • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

    • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

与 MongoDB 驱动程序不同, mongosh不会在出错后自动尝试恢复变更流游标。 MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标。

Mongo.watch()使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给resumeAfterstartAfter选项的恢复令牌标识的操作已经删除oplog ,则Mongo.watch()无法恢复变更流。

有关恢复变更流的更多信息,请参阅恢复变更流

注意

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。从 MongoDB 4.2 开始,可使用 StartAfter无效事件后启动新的变更流。

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

注意

恢复令牌

恢复令牌 _data 类型取决于 MongoDB 版本,在某些情况下,还取决于变更流打开/恢复时的功能兼容性版本 (fcv)(即 fcv 值的更改不会影响已经打开的变更流的恢复令牌):

MongoDB 版本
特征兼容性版本
恢复令牌 _data 类型
MongoDB 4.2 及更高版本
“4.2”或“4.0”
十六进制编码的字符串 (v1)
MongoDB 4.0.7 及更高版本
“4.0”或“3.6”
十六进制编码的字符串 (v1)
MongoDB 4.0.6 及更早版本
"4.0"
十六进制编码的字符串 (v0)
MongoDB 4.0.6 及更早版本
"3.6"
BinData
MongoDB 3.6
"3.6"
BinData

使用十六进制编码的字符串恢复令牌,您可以对恢复令牌进行比较和排序。

无论 fcv 值如何,4.0 部署都可以使用 BinData 恢复令牌或十六进制字符串恢复令牌来变更流。因此,4.0 部署可以使用在 3.6 部署的集合上打开的变更流中的恢复令牌。

MongoDB 版本中引入的新恢复令牌格式不能被早期 MongoDB 版本使用。

MongoDB 提供了"snippet" ,它是mongosh的扩展,用于解码十六进制编码的恢复令牌。

您可以安装并运行 恢复令牌 mongosh的代码片段:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

您还可以运行 resumetoken mongoshnpm如果系统上安装了 ,则在命令行中(不使用 ):

npx mongodb-resumetoken-decoder <RESUME TOKEN>

请参阅以下内容了解详细信息:

默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。

根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。

无论是否支持"majority" 读关注, 变更流 都可用;也就是说,可以启用(默认)或majority 禁用 读关注 支持,以使用变更流。

使用访问控制运行时,用户必须对所有数据库中的所有非系统集合具有 findchangeStream 权限操作。换言之,用户必须具有能授予以下特权角色

{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

内置 read 角色提供了相应的特权。

MongoDB 提供多种在游标上迭代的方法。

cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

mongosh中的以下操作在副本集上打开变更流游标。返回的游标报告所有数据库中所有非system集合的数据更改, adminlocalconfig数据库除外。

watchCursor = db.getMongo().watch()

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

如需变更流输出的完整文档,请参阅变更事件。

注意

不能将 isExhausted()变更流结合使用。

← Mongo.setWriteConcern()
会话 →