Docs 主页 → 开发应用程序 → MongoDB Manual
Change Streams
变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。
从 MongoDB 5.1 开始,我们对变更流进行了优化,提高了资源利用率,并加快了某些聚合管道阶段的处理速度。
可用性
存储引擎。
副本集和分片集群必须使用 WiredTiger 存储引擎。 Change stream 也可以用于采用 MongoDB 的静态加密功能的部署。
副本集协议版本。
副本集和分片集群必须使用副本集协议版本 1 (
pv1
)。读关注“多数”启用。
从 MongoDB 4.2 开始,无论是否有
"majority"
读关注支持,change stream 都可用;也就是说,读关注majority
支持可以启用(默认),也可以禁用以使用 change stream。在 MongoDB 4.0 及更早版本中,变更流仅在系统启用
"majority"
读关注支持(默认)的情况下才可用。
连接
连接 change stream 可以使用带有+srv
连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。
如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。
有关更多信息,请参阅连接字符串 URI 格式。
监视集合、数据库或部署
可以针对如下情况打开变更流:
目标 | 说明 |
---|---|
集合 | 可以为单个集合(除 本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 |
数据库 | 从 MongoDB 4.0 开始,可以为单个数据库(不包括 有关 MongoDB 驱动程序方法,请参阅驱动程序文档。另请参阅 |
部署 | 从 MongoDB 4.0 开始,可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 有关 MongoDB 驱动程序方法,请参阅驱动程序文档。另请参阅 |
注意
变更流示例
本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。
变更流性能考量
如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。
分片集群注意事项
在分片集群上打开变更流时:
mongos
在每个分片上创建单独的变更流。无论变更流是否针对特定的分片密钥范围,都会出现这种行为。当
mongos
收到更改流结果时,它会对这些结果进行排序和筛选。如有必要,mongos
还会执行fullDocument
查找。
为获得最佳性能,请在变更流中限制对 $lookup
查询的使用。
打开变更流
要打开变更流:
对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。
对于分片集群,必须从
mongos
中发出打开变更流操作。
以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。
变更流游标保持打开状态,直到发生以下任一情况:
游标已明确关闭。
发生失效事件;例如删除或重命名集合。
与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为。
如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。
注意
未关闭游标的生命周期取决于语言。
[1] | 从 MongoDB 4.0 开始,可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。 |
修改变更流输出
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
查找更新操作的完整文档
默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
注意
如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。
但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。
请参阅变更事件以了解有关变更流响应文档格式的更多信息。
恢复变更流
通过在打开游标时将恢复令牌指定为resumeAfter或startAfter ,可以恢复变更流。
resumeAfter
对于 Change Stream
您可以在打开游标时将恢复令牌传递给 resumeAfter
,从而在特定事件发生后恢复 change stream。
有关恢复令牌的更多信息,请参阅恢复令牌。
重要
如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。
在无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用
resumeAfter
恢复变更流。从 MongoDB 4开始。 2 ,您可以使用startAfter在无效事件后启动新的变更流。
startAfter
对于 Change Stream
4.2 版本中的新增功能。
您可以在打开游标时将恢复令牌传递给startAfter
,从而在特定事件之后启动新的变更流。与resumeAfter不同, startAfter
可以通过创建新的变更流,在无效事件发生后恢复通知。
有关恢复令牌的更多信息,请参阅恢复令牌。
重要
如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。
恢复令牌
恢复令牌可从多个来源获取:
源 | 说明 |
---|---|
更改事件通知包含针对 _id 字段的恢复词元: | |
该字段仅在使用 | |
getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。 |
提示
MongoDB 提供了"snippet" ,它是mongosh
的扩展,用于解码十六进制编码的恢复令牌。
您可以安装并运行 恢复令牌 mongosh
的代码片段:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
您还可以运行 resumetoken mongosh
npm
如果系统上安装了 ,则在命令行中(不使用 ):
npx mongodb-resumetoken-decoder <RESUME TOKEN>
请参阅以下内容了解详细信息:
从变更事件中恢复令牌
更改事件通知包含针对 _id
字段的恢复令牌:
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
来自以下项的恢复令牌: aggregate
使用 aggregate
命令时,$changeStream
聚合阶段在 cursor.postBatchResumeToken
字段中包含恢复令牌:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
来自以下项的恢复令牌: getMore
getMore
命令还在 cursor.postBatchResumeToken
字段中包含一个恢复令牌:
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
用例
变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。
访问控制
要打开针对特定集合的变更流,应用程序必须具有对相应集合授予
changeStream
和find
动作的特权。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非
system
集合授予changeStream
和find
动作的特权。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非
system
集合授予changeStream
和find
动作的特权。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
事件通知
变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。
例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。
如果某个操作与事务相关联,则变更事件文档包括 txnNumber
和 lsid
。
排序规则
从 MongoDB 4.2 开始,变更流使用 simple
二进制比较,除非提供显式排序规则。在早期版本中,在单个集合 (db.collection.watch()
) 上打开的变更流将继承该集合的默认排序规则。
变更流和孤立文档
附带文档前映像和后映像的变更流
从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):
前映像是指被替换、更新或删除之前的文档。已插入的文档没有前映像。
后图像是插入、替换或更新后的文档。 已删除的文档没有后图像。
使用
db.createCollection()
、create
或collMod
为集合启用changeStreamPreAndPostImages
。
如果图像属于以下情况,则前像和后像不可用于变更流事件:
在文档更新或删除操作时未对集合启用。
在
expireAfterSeconds
中设置的前像和后像保留时间后之后被删除。以下示例将整个集群的
expireAfterSeconds
设置为100
秒:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 以下示例将特定collection上的
expireAfterSeconds
设置为100
秒:use admin db.getSiblingDB("my_collection") .sensors.watch({ changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } }) 以下示例返回当前的
changeStreamOptions
设置,包括expireAfterSeconds
:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) 将
expireAfterSeconds
设置为off
可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。如果变更流事件从 oplog 中删除,则无论
expireAfterSeconds
前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。
其他考量:
启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。
将变更流事件大小限制为小于 16 MB。要限制事件大小,您可以:
将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如
updateDescription
)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。如果其他变更流事件字段(例如
updateDescription
)并不大,则仅请求变更流输出中最多 16 MB 的文档的后像。在以下情况下,仅请求最大 16 MB 的文档的 change stream 输出中的预映像:
文档更新仅影响文档结构或内容的一小部分,且
不会引起
replace
变更事件。replace
事件始终包含后像。
要请求前图像,请在
db.collection.watch()
中将fullDocumentBeforeChange
设置为required
或whenAvailable
。要请求后图像,您可以使用相同的方法设置fullDocument
。前像被写入
config.system.preimages
集合。config.system.preimages
集合可能会变大。要限制集合大小,可如前文所示为前映像设置expireAfterSeconds
时间。前像由后台进程异步删除。
重要
向后不兼容的功能
从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod
命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。
提示
另请参阅:
有关变更流事件和输出,请参阅变更事件。
要查看集合的变化,请参阅
db.collection.watch()
。有关变更流输出的完整示例,请参阅使用文档前像和后像的变更流。
有关变更流输出的完整示例,请参阅使用文档前像和后像的变更流。