MongoDB.local SF, Jan 15: See the speaker lineup & ship your AI vision faster. Use WEB50 to save 50%
Find out more >
Docs 菜单
Docs 主页
/ /
集合

db.collection.watch()

db.collection.watch( pipeline, options )

重要

mongosh 方法

本页面提供 mongosh 方法的相关信息。这不是数据库命令或特定语言驱动程序(例如 Node.js)的相关文档。

有关数据库命令,请参阅附带 $changeStream 聚合阶段的 aggregate 命令。

如需了解 MongoDB API 驱动程序,请参阅特定语言的 MongoDB 驱动程序文档。

仅适用于副本集和分片集群

打开针对某一集合的变更流游标

Parameter
类型
说明

pipeline

阵列

可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成:

指定用于筛选/修改变更事件输出的管道。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

options

文档

可选。 用于修改watch() 行为的其他选项。

options 文档可包含以下字段和值:

字段
类型
说明

resumeAfter

文档

可选。 指示watch() 尝试在恢复令牌中指定的操作之后开始恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

resumeAfterstartAfterstartAtOperationTime 互斥。

startAfter

文档

可选。 指示watch() 在恢复令牌中指定的操作后尝试启动新的变更流。允许在无效事件后恢复通知。

每份变更流事件文档都包含一个恢复令牌作为 _id 字段。传递变更事件文档的整个 _id 字段,代表之后想要恢复的操作。

startAfterresumeAfterstartAtOperationTime 互斥。

fullDocument

字符串

可选。 默认情况下,watch() 返回通过更新操作修改的字段的增量,而不是整个更新后的文档。

将 设置为fullDocument "updateLookup"watch(),以指示 查找更新文档的最新多数提交版本。watch()fullDocument除了 增量外,还会返回一个包含文档查找的updateDescription 字段。

batchSize

int

可选。指定从 MongoDB 集群响应的每个批次中返回的变更事件的最大数量。

功能与 cursor.batchSize() 相同。

maxAwaitTimeMS

int

可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。

默认值为 1000 毫秒。

collation

文档

可选。 传递collation 文档,为 change stream 游标指定排序规则。

如果省略,则默认为 simple 二进制比较。

startAtOperationTime

时间戳

可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅 rs.printReplicationInfo()

startAtOperationTimeresumeAfterstartAfter 互斥。

返回:只要与 MongoDB 部署的连接保持打开集合存在,游标便会保持打开状态。请参阅变更事件,获取变更事件文档的示例。

提示

此方法可用于以下环境中托管的部署:

注意

所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令

db.collection.watch() 可用于副本集和分片集群部署:

只能将 db.collection.watch()Wired Tiger 存储引擎一起使用。

从 MongoDB 4.2 开始,无论是否有 "majority" 读关注支持,change stream 都可用;也就是说,读关注 majority 支持可以启用(默认),也可以禁用以使用 change stream。

在 MongoDB 4.0 及更早版本中,仅当启用 "majority" 读关注支持时,才能使用变更流

  • db.collection.watch() 仅通知持续到大多数数据承载节点的数据更改。

  • 变更流游标保持打开状态,直到出现以下任一情况:

    • 游标已明确关闭。

    • 发生失效事件;例如删除或重命名集合。

    • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

    • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

与 MongoDB 驱动程序不同,mongosh 不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次

db.collection.watch() 使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfterstartAfter 选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.collection.watch() 无法恢复变更流。

有关恢复变更流的更多信息,请参阅恢复变更流

注意

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

注意

在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。

根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。

使用访问控制来运行时,用户必须对集合资源具有 findchangeStream 特权动作。换言之,用户必须具有能授予以下特权角色

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }

内置 read 角色提供了相应的特权。

MongoDB 提供多种在游标上迭代的方法。

cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

以下操作针对 data.sensors 集合打开 change stream 游标:

watchCursor = db.getSiblingDB("data").sensors.watch()

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

如需变更流输出的完整文档,请参阅变更事件。

注意

不能将 isExhausted()变更流结合使用。

fullDocument 选项设置为 "updateLookup",从而指示变更流游标查找与更新变更流事件关联的文档的最新多数提交版本。

以下操作使用fullDocument : "updateLookup"选项打开针对data.sensors集合的 change stream 游标。

watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

对于任何更新操作,变更事件都会在 fullDocument 字段中返回文档查找的结果。

有关完整文档更新输出的示例,请参阅变更流更新事件

如需变更流输出的完整文档,请参阅变更事件。

注意

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

以下操作使用聚合管道打开针对 data.sensors 集合的变更流游标,从而仅筛选 insert 事件:

watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!watchCursor.isClosed()){
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}

变更流游标仅返回 operationTypeinsert 的变更事件。如需变更流输出的完整文档,请参阅变更事件

变更流游标返回的每份文档均包含一个恢复令牌作为 _id 字段。要恢复变更流,请将要恢复的变更事件的整个 _id 文档传递给 watch()resumeAfterstartAfter 选项。

以下操作使用恢复令牌恢复针对 data.sensors 集合的变更流游标。这个示例假设生成恢复令牌的操作尚未从集群的 oplog 中删除。

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
watchCursor.close();
let resumeToken = firstChange._id;
resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)

迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭最新批次中没有剩余对象时才退出:

while (!resumedWatchCursor.isClosed()){
if (resumedWatchCursor.hasNext()){
print(resumedWatchCursor.next());
}
}

请参阅恢复变更流,获取恢复变更流的完整文档。

后退

db.collection.validate()

在此页面上