Docs 菜单

Docs 主页开发应用程序MongoDB Manual

Change Streams

在此页面上

  • 可用性
  • 连接
  • 监视集合、数据库或部署
  • 变更流性能考量
  • 打开变更流
  • 修改变更流输出
  • 查找更新操作的完整文档
  • 恢复变更流
  • 用例
  • 访问控制
  • 事件通知
  • 排序规则
  • 变更流和孤立文档
  • 附带文档前映像和后映像的变更流

变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。

从 MongoDB 5.1 开始,我们对变更流进行了优化,提高了资源利用率,并加快了某些聚合管道阶段的处理速度。

变更流可用于副本集分片集群

连接 change stream 可以使用带有+srv连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。

如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。

有关更多信息,请参阅连接字符串 URI 格式。

可以针对如下情况打开变更流:

目标
说明
集合

可以为单个集合(除 system 集合,adminlocalconfig 数据库中的任何集合)打开变更流游标。

本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 mongosh方法db.collection.watch()

数据库

从 MongoDB 4.0 开始,可以为单个数据库(不包括adminlocalconfig 数据库)打开变更流游标,以监控所有非系统集合的更改。

有关 MongoDB 驱动程序方法,请参阅驱动程序文档。另请参阅mongosh方法db.watch()

部署

从 MongoDB 4.0 开始,可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 adminlocalconfig 外)中对所有非系统集合的变更。

有关 MongoDB 驱动程序方法,请参阅驱动程序文档。另请参阅mongosh方法Mongo.watch()

注意

变更流示例

本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。

如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。

在分片集群上打开变更流时:

  • mongos每个分片上创建单独的变更流。无论变更流是否针对特定的分片密钥范围,都会出现这种行为。

  • mongos 收到更改流结果时,它会对这些结果进行排序和筛选。如有必要,mongos 还会执行 fullDocument 查找。

为获得最佳性能,请在变更流中限制对 $lookup 查询的使用。

要打开变更流:

  • 对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。

  • 对于分片集群,必须从 mongos 中发出打开变更流操作。

以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。

变更流游标保持打开状态,直到发生以下任一情况:

  • 游标已明确关闭。

  • 发生失效事件;例如删除或重命名集合。

  • 与 MongoDB 部署之间的连接关闭或超时。有关更多信息,请参阅游标行为

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭,而关闭的变更流游标可能无法完全恢复。

注意

未关闭游标的生命周期取决于语言。

[1] 从 MongoDB 4.0 开始,可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。

➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


提示

变更流事件文档的_id字段充当恢复令牌。请勿使用管道修改或删除变更流事件的_id字段。

从 MongoDB 4开始。 2 ,如果变更流聚合管道修改了事件的_id字段,则变更流将引发异常。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。


➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


注意

如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。

但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

通过在打开游标时将恢复令牌指定为resumeAfterstartAfter ,可以恢复变更流。

您可以在打开游标时将恢复令牌传递给 resumeAfter ,从而在特定事件发生后恢复 change stream。

有关恢复令牌的更多信息,请参阅恢复令牌。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

  • 无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用resumeAfter恢复变更流。从 MongoDB 4开始。 2 ,您可以使用startAfter无效事件后启动新的变更流。

4.2 版本中的新增功能

您可以在打开游标时将恢复令牌传递给startAfter ,从而在特定事件之后启动新的变更流。与resumeAfter不同, startAfter可以通过创建新的变更流,在无效事件发生后恢复通知。

有关恢复令牌的更多信息,请参阅恢复令牌。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

恢复令牌可从多个来源获取:

说明
更改事件通知包含针对 _id 字段的恢复词元:

$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌。

该字段仅在使用 aggregate 命令时显示。

getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。

4.2 版的变更:从 MongoDB 4.2版开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

提示

MongoDB 提供了"snippet" ,它是mongosh的扩展,用于解码十六进制编码的恢复令牌。

您可以安装并运行 恢复令牌 mongosh的代码片段:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

您还可以运行 resumetoken mongoshnpm如果系统上安装了 ,则在命令行中(不使用 ):

npx mongodb-resumetoken-decoder <RESUME TOKEN>

请参阅以下内容了解详细信息:

更改事件通知包含针对 _id 字段的恢复令牌:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

使用 aggregate 命令时,$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 命令还在 cursor.postBatchResumeToken 字段中包含一个恢复令牌:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。

对于强制执行身份验证授权的部署:

  • 要打开针对特定集合的变更流,应用程序必须具有对相应集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。

例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。

如果某个操作与事务相关联,则变更事件文档包括 txnNumberlsid

从 MongoDB 4.2 开始,变更流使用 simple 二进制比较,除非提供显式排序规则。在早期版本中,在单个集合 (db.collection.watch()) 上打开的变更流将继承该集合的默认排序规则。

从 MongoDB5 开始。3 , 范围迁移 期间,不会为更新 孤立文档 生成 变更流 事件。

从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):

如果图像属于以下情况,则前像和后像不可用于变更流事件

  • 在文档更新或删除操作时未对集合启用。

  • expireAfterSeconds 中设置的前像和后像保留时间后之后被删除。

    • 以下示例将整个集群的expireAfterSeconds设置为100秒:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } }
      } )
    • 以下示例将特定collection上的expireAfterSeconds设置为100秒:

      use admin
      db.getSiblingDB("my_collection")
      .sensors.watch({ changeStreamOptions:
      { preAndPostImages: { expireAfterSeconds: 100 } } })
    • 以下示例返回当前的 changeStreamOptions 设置,包括 expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSeconds 设置为 off 可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。

    • 如果变更流事件从 oplog 中删除,则无论 expireAfterSeconds 前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。

其他考量:

  • 启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。

  • 将变更流事件大小限制为小于 16 MB。要限制事件大小,您可以:

    • 将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如 updateDescription)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。

    • 如果其他变更流事件字段(例如 updateDescription)并不大,则仅请求变更流输出中最多 16 MB 的文档的后像。

    • 在以下情况下,仅请求最大 16 MB 的文档的 change stream 输出中的预映像:

      • 文档更新仅影响文档结构或内容的一小部分,

      • 不会引起 replace 变更事件。replace 事件始终包含后像。

  • 要请求前图像,请在db.collection.watch()中将fullDocumentBeforeChange设置为requiredwhenAvailable。要请求后图像,您可以使用相同的方法设置fullDocument

  • 前像被写入 config.system.preimages 集合。

    • config.system.preimages 集合可能会变大。要限制集合大小,可如前文所示为前映像设置 expireAfterSeconds 时间。

    • 前像由后台进程异步删除。

重要

向后不兼容的功能

从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod 命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。

提示

另请参阅:

有关变更流输出的完整示例,请参阅使用文档前像和后像的变更流

← 时间序列集合限制