Mongo.watch()
定义
Mongo.watch( pipeline, options )
仅适用于副本集和分片集群
打开副本集或分分片集群的变更流游标,以报告其数据库中的所有非
system
集合,但admin
、local
和config
数据库除外。Parameter类型说明pipeline
阵列
可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:
指定用于筛选/修改变更事件输出的管道。
options
文档
可选。用于修改
Mongo.watch()
行为的其他选项。options
文档可包含以下字段和值:字段类型说明resumeAfter
文档
可选。指示
Mongo.watch()
尝试在恢复令牌中指定的操作之后开始恢复通知。每份变更流事件文档都包含一个恢复令牌作为
_id
字段。传递变更事件文档的整个_id
字段,代表之后想要恢复的操作。resumeAfter
与startAfter
和startAtOperationTime
互斥。startAfter
文档
可选。指示
Mongo.watch()
在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。每份变更流事件文档都包含一个恢复令牌作为
_id
字段。传递变更事件文档的整个_id
字段,代表之后想要恢复的操作。startAfter
与resumeAfter
和startAtOperationTime
互斥。fullDocument
字符串
可选。默认,
Mongo.watch()
返回通过更新操作修改的字段的增量,而不是整个更新后的文档。fullDocument
"updateLookup"
Mongo.watch()
Mongo.watch()
将fullDocument
设置为updateDescription
,以指示 查找更新文档的最新多数提交版本。 除了 增量外,还会返回一个包含文档查找的 字段。batchSize
int
可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。
功能与
cursor.batchSize()
相同。maxAwaitTimeMS
int
可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。
默认值为
1000
毫秒。collation
文档
可选。传递排序规则文档以指定变更流游标的排序规则。
如果省略,则默认为
simple
二进制比较。startAtOperationTime
时间戳
可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅
rs.printReplicationInfo()
。startAtOperationTime
与resumeAfter
和startAfter
互斥。返回: 游标位于更改事件文档上。有关更改事件文档的示例,请参阅更改事件。
兼容性
此方法可用于以下环境中托管的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
可用性
部署
Mongo.watch()
适用于副本集和分片集群:
对于副本集,您可在任何承载数据的节点上发出
Mongo.watch()
。对于分片集群,您必须在一个
mongos
实例上发出Mongo.watch()
。
引擎加密
只能将 Mongo.watch()
与 Wired Tiger 存储引擎一起使用。
读关注majority
支持
无论是否支持"majority"
读关注(read concern), 变更流 都可用;也就是说,可以启用(默认)或majority
禁用 读关注(read concern) 支持,以使用变更流。
行为
Mongo.watch()
仅通知持续到大多数数据承载节点的数据更改。变更流游标保持打开状态,直到出现以下任一情况:
可恢复性
与 MongoDB 驱动程序不同,mongosh
不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次。
Mongo.watch()
使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfter
或 startAfter
选项的恢复令牌所标识的操作已经从 oplog 中删除,则 Mongo.watch()
无法恢复变更流。
有关恢复变更流的更多信息,请参阅恢复变更流。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter
来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
注意
恢复令牌
恢复令牌 _data
类型取决于 MongoDB 版本,在某些情况下,还取决于变更流打开/恢复时的功能兼容性版本 (fcv)(即 fcv 值的更改不会影响已经打开的变更流的恢复令牌):
MongoDB 版本 | 特征兼容性版本 | 恢复令牌 _data 类型 |
---|---|---|
MongoDB 4.2 及更高版本 | “4.2”或“4.0” | 十六进制编码的字符串 ( |
MongoDB 4.0.7 及更高版本 | “4.0”或“3.6” | 十六进制编码的字符串 ( |
MongoDB 4.0.6 及更早版本 | "4.0" | 十六进制编码的字符串 ( |
MongoDB 4.0.6 及更早版本 | "3.6" | BinData |
MongoDB 3.6 | "3.6" | BinData |
十六进制编码令牌
使用十六进制编码的字符串恢复令牌,您可以对恢复令牌进行比较和排序。
无论 fcv 值如何,4.0 部署都可以使用 BinData 恢复令牌或十六进制字符串恢复令牌来变更流。因此,4.0 部署可以使用在 3.6 部署的集合上打开的变更流中的恢复令牌。
MongoDB 版本中引入的新恢复令牌格式不能被早期 MongoDB 版本使用。
解码恢复令牌
MongoDB 提供了“代码段”,这是 mongosh
的扩展,用于解码十六进制编码的恢复令牌。
您可以从 mongosh
安装和运行 resumetoken 代码段:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
如果系统上安装了 npm
,那么您还可以在命令行中运行 resumetoken (并且不使用 mongosh
):
npx mongodb-resumetoken-decoder <RESUME TOKEN>
请参阅以下内容了解详细信息:
更新操作的完整文件查询
默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。
根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。
可用性
无论是否支持"majority"
读关注(read concern), 变更流 都可用;也就是说,可以启用(默认)或majority
禁用 读关注(read concern) 支持,以使用变更流。
访问控制
使用访问控制运行时,用户必须对所有数据库中的所有非系统集合具有 find
和 changeStream
权限操作。换言之,用户必须具有能授予以下特权的角色:
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
内置 read
角色提供了相应的特权。
游标迭代
MongoDB 提供多种在游标上迭代的方法。
cursor.hasNext()
方法会阻塞并等待下一个事件。要监控 watchCursor
游标并迭代事件,请使用 hasNext()
,如下所示:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
该 cursor.tryNext()
方法为非阻塞式。要监控 watchCursor
游标并迭代事件,请使用 tryNext()
,如下所示:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
例子
mongosh
中的以下操作会在副本集上打开一个变更流游标。返回的游标报告所有数据库中所有非 system
集合的数据变更,admin
、local
和 config
数据库除外。
watchCursor = db.getMongo().watch()
迭代游标以检查是否存在新事件。使用 cursor.isClosed()
方法和 cursor.tryNext()
方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
如需变更流输出的完整文档,请参阅变更事件。
注意
不能将 isExhausted()
与变更流结合使用。