重要
mongosh 方法
本页面提供 mongosh 方法的相关信息。这不是数据库命令或特定语言驱动程序(例如 Node.js)的相关文档。
有关数据库命令,请参阅附带 $changeStream 聚合阶段的 aggregate 命令。
如需了解 MongoDB API 驱动程序,请参阅特定语言的 MongoDB 驱动程序文档。
定义
db.collection.watch( pipeline, options )仅适用于副本集和分片集群
打开针对某一集合的变更流游标。
返回: 只要与 MongoDB 部署的连接保持打开且集合存在,游标便会保持打开状态。请参阅变更事件,获取变更事件文档的示例。
参数
Parameter | 类型 | 说明 |
|---|---|---|
| 阵列 | 可选。Aggregation Pipeline 由以下一个或多个聚合阶段组成: 指定用于筛选/修改变更事件输出的管道。 |
| 文档 | 可选。 用于修改 |
字段
options 文档可包含以下字段和值:
字段 | 类型 | 说明 |
|---|---|---|
| 文档 | 可选。 指示 每份变更流事件文档都包含一个恢复令牌作为
|
| 文档 | 可选。 指示 每份变更流事件文档都包含一个恢复令牌作为
|
| 字符串 | |
| 字符串 | 可选。 从 MongoDB 6.0 开始,您可以使用新的
|
| int | 可选。每批变更流中可以返回的最大文档数量。默认情况下, 功能与 |
| int | 可选。在返回空批次之前,服务器等待新数据更改以报告给变更流游标的最长时间(以毫秒为单位)。 默认值为 |
| 文档 | 可选。传递排序规则文档以指定变更流游标的排序规则。 如果省略,则默认为 |
| 布尔 | 可选。从 MongoDB 6.0 开始,change stream 支持 DDL 事件的变更通知,如 createIndexes 和 dropIndexes 事件。要在 change stream 中包含扩展事件,请使用 6.0 版本中的新功能。 |
| 时间戳 | 可选。变更流的起点。如果指定的起点在过去,它必须在 oplog 的时间范围内。要检查 oplog 的时间范围,请参阅
|
兼容性
此方法可用于以下环境中托管的部署:
MongoDB Atlas:用于云中 MongoDB 部署的完全托管服务
注意
所有 MongoDB Atlas 集群都支持此命令。有关 Atlas 对所有命令的支持的信息,请参阅不支持的命令。
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
可用性
部署
db.collection.watch() 可用于副本集和分片集群部署:
对于副本集,您可在任何承载数据的节点上发出
db.collection.watch()。对于分片集群,您必须在一个
mongos实例上发出db.collection.watch()。
引擎加密
只能将 db.collection.watch() 与 Wired Tiger 存储引擎一起使用。
读关注majority 支持
无论是否支持"majority" 读关注(read concern), 变更流 都可用;也就是说,可以启用(默认)或majority 禁用 读关注(read concern) 支持,以使用变更流。
行为
db.collection.watch()仅通知持续到大多数数据承载节点的数据更改。变更流游标保持打开状态,直到出现以下任一情况:
游标已明确关闭。
发生失效事件;例如删除或重命名集合。
如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
可恢复性
与 MongoDB 驱动程序不同,mongosh 不会在出错后自动尝试恢复变更流游标。MongoDB 驱动程序会在出现某些错误后尝试自动恢复变更流游标一次。
db.collection.watch() 使用存储在 oplog 中的信息生成变更事件描述,并生成与该操作相关的恢复令牌。如果传递给 resumeAfter 或 startAfter 选项的恢复令牌所标识的操作已经从 oplog 中删除,则 db.collection.watch() 无法恢复变更流。
有关恢复变更流的更多信息,请参阅恢复变更流。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。
注意
在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter 在无效事件后启动新的变更流。
更新操作的完整文件查询
默认情况下,变更流游标将返回用于更新操作的特定字段更改/增量。您还可以配置变更流,以查找并返回已更改的文档的最新多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档明显不同。
根据更新操作期间应用的变更数量和完整文档的大小,存在更新操作的变更事件文档的大小大于 16MB BSON 文档限制的风险。如果发生这种情况,服务器将关闭变更流游标并返回错误。
访问控制
使用访问控制来运行时,用户必须对集合资源具有 find 和 changeStream 特权动作。换言之,用户必须具有能授予以下特权的角色:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
内置 read 角色提供了相应的特权。
游标迭代
MongoDB 提供多种在游标上迭代的方法。
cursor.hasNext() 方法会阻塞并等待下一个事件。要监控 watchCursor 游标并迭代事件,请使用 hasNext(),如下所示:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
该 cursor.tryNext() 方法为非阻塞式。要监控 watchCursor 游标并迭代事件,请使用 tryNext(),如下所示:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
示例
打开变更流
以下操作针对 data.sensors 集合打开 change stream 游标:
watchCursor = db.getSiblingDB("data").sensors.watch()
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
如需变更流输出的完整文档,请参阅变更事件。
注意
不能将 isExhausted() 与变更流结合使用。
通过完整文档更新查找 Change Stream
将 fullDocument 选项设置为 "updateLookup",从而指示变更流游标查找与更新变更流事件关联的文档的最新多数提交版本。
以下操作使用fullDocument : "updateLookup"选项打开针对data.sensors集合的 change stream 游标。
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.tryNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
对于任何更新操作,变更事件都会在 fullDocument 字段中返回文档查找的结果。
有关完整文档更新输出的示例,请参阅变更流更新事件。
如需变更流输出的完整文档,请参阅变更事件。
附带文档前映像和后映像的变更流
从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):
前映像是指被替换、更新或删除之前的文档。已插入的文档没有前映像。
后图像是插入、替换或更新后的文档。 已删除的文档没有后图像。
使用
db.createCollection()、create或collMod为集合启用changeStreamPreAndPostImages。示例,使用collMod命令时:db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: true } } )
如果图像属于以下情况,则前像和后像不可用于变更流事件:
在文档更新或删除操作时未对集合启用。
在
expireAfterSeconds中设置的前像和后像保留时间后之后被删除。以下示例将整个集群上的
expireAfterSeconds设置为100秒:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 注意
MongoDB Atlas集群中不支持
setClusterParameter命令。有关AtlasAtlas支持所有命令的信息,请参阅AtlasAtlas中不支持的命令。以下示例返回当前的
changeStreamOptions设置,包括expireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) 将
expireAfterSeconds设置为off可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。如果变更流事件从 oplog 中删除,则无论
expireAfterSeconds前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。
其他考量:
启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。
将变更流事件大小限制为小于 16 MiB。要限制事件大小,您可以:
将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如
updateDescription)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。如果其他变更流事件字段(例如
updateDescription)并不大,则仅请求变更流输出中最多 16 MiB 的文档的后像。在以下情况下,仅请求变更流输出中最多 16 MiB 的文档的前像:
文档更新仅影响文档结构或内容的一小部分,且
不会引起
replace变更事件。replace事件始终包含后像。
如要请求前图像,请在
db.collection.watch()中将fullDocumentBeforeChange设置为required或whenAvailable。如要请求后图像,请使用相同方法设置fullDocument。前像被写入
config.system.preimages集合。config.system.preimages集合可能会变大。要限制集合大小,可如前文所示为前映像设置expireAfterSeconds时间。前像由后台进程异步删除。
重要
向后不兼容的功能
从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod 命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。
提示
有关变更流事件和输出,请参阅变更事件。
要查看集合的变化,请参阅
db.collection.watch()。有关变更流输出的完整示例,请参阅具有文档前像和后像的变更流。
创建集合
创建启用 changeStreamPreAndPostImages 的 temperatureSensor 集合:
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
用温度读数填充 temperatureSensor 集合:
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
以下部分显示了使用 temperatureSensor 集合的文档前映像和后映像的 change stream 示例。
使用文档前图像变更流
使用 fullDocumentBeforeChange: "whenAvailable" 设置输出文档前像(如有)。前像是指被替换、更新或删除之前的文档。插入的文档没有前像。
以下示例使用 fullDocumentBeforeChange:
"whenAvailable" 为 temperatureSensor 集合创建变更流游标:
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
以下示例使用游标检查新的变更流事件:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
在示例中:
while循环将一直运行,直到游标关闭。如果该游标包含文档,则
hasNext()返回true。
以下示例更新temperatureSensor文档的reading字段:
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
更新 temperatureSensor 文档后,变更事件会在 fullDocumentBeforeChange 字段中输出文档前像。前像包含更新之前的 temperatureSensor文档reading 字段。例如:
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
有文档后像的变更流
使用 fullDocument: "whenAvailable" 设置输出文档后像(如果可用)。后像是指插入、替换或更新后的文档。已删除的文档没有后像。
以下示例使用 fullDocument:
"whenAvailable" 为 temperatureSensor 集合创建变更流游标:
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
以下示例使用游标检查新的变更流事件:
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
在示例中:
while循环将一直运行,直到游标关闭。如果该游标包含文档,则
hasNext()返回true。
以下示例更新temperatureSensor文档的reading字段:
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
更新 temperatureSensor 文档后,变更事件会在 fullDocument 字段中输出文档后图像。后图像包含更新之后的 temperatureSensor 文档reading字段。例如:
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
使用 Aggregation Pipeline 筛选器的 Change Stream
以下操作使用聚合管道打开针对 data.sensors 集合的变更流游标,从而仅筛选 insert 事件:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
变更流游标仅返回 operationType 为 insert 的变更事件。如需变更流输出的完整文档,请参阅变更事件。
恢复 Change Stream
变更流游标返回的每份文档均包含一个恢复令牌作为 _id 字段。要恢复变更流,请将要恢复的变更事件的整个 _id 文档传递给 watch() 的 resumeAfter 或 startAfter 选项。
以下操作使用恢复令牌恢复针对 data.sensors 集合的变更流游标。这个示例假设生成恢复令牌的操作尚未从集群的 oplog 中删除。
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
迭代游标以检查是否存在新事件。使用 cursor.isClosed() 方法和 cursor.hasNext() 方法,确保循环仅在变更流游标已关闭且最新批次中没有剩余对象时才退出:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
请参阅恢复变更流,获取恢复变更流的完整文档。