对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

Change Streams

在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是MongoDB Server的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。

本指南中的示例使用Atlas示例数据集中sample_restaurants.restaurants集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门指南。

重要

项目 Reactor 库

本指南使用 Project Reactor 库来使用Java Reactive Streams驱动程序方法返回的 Publisher 实例。要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 Reactor 文档中的入门。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB”指南。

要打开变更流,请调用watch()方法。 调用该方法的实例决定了变更流侦听的事件范围。 您可以对以下类的实例调用watch()方法:

  • MongoClient:监控 MongoDB 部署中的所有更改

  • MongoDatabase:监控数据库中所有集合的变更

  • MongoCollection:监控集合中的更改

以下示例在restaurants集合上打开变更流,并在发生变更时输出变更:

// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch();
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要开始监视更改,请运行应用程序。 然后,在单独的应用程序或shell中对 restaurants集合执行写入操作。 更新"name"字段值为"Blarney Castle"的文档会产生以下变更流输出:

Received change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=null,
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Traditional Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null,
splitEvent=null, wallTime=BsonDateTime{value=...}}

您可以将聚合管道作为参数传递给watch()方法,以修改变更流输出。 此参数允许您仅监视指定的变更事件。

您可以在pipeline参数中指定以下聚合阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

以下示例将聚合管道传递给变更流以仅记录更新操作:

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。

您可以将方法链接到 watch() 方法,这些方法表示可用于配置变更流操作的选项。如果不指定任何选项,驱动程序不会自定义操作。

下表描述了可以链接到watch()以自定义其行为的方法:

选项
说明

fullDocument()

指定是否显示更改后的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像

fullDocumentBeforeChange()

指定是否显示更改前的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像

resumeAfter()

watch()指示 在恢复令牌中指定的操作后恢复返回更改。每个变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id 字段,表示之后要恢复的操作。
resumeAfter() startAfter()与 和startAtOperationTime() 互斥。

startAfter()

指示watch() 在恢复令牌中指定的操作后启动新的变更流。允许在无效事件后恢复通知。每个变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id 字段,表示之后要恢复的操作。
startAfter() resumeAfter()与 和startAtOperationTime() 互斥。

startAtOperationTime()

指示watch() 仅返回指定时间戳之后发生的事件。
startAtOperationTime() 与 和resumeAfter() startAfter()互斥。

maxAwaitTime()

指定服务器在返回空批处理之前等待新数据更改报告给变更流游标的最长时间(以毫秒为单位)。 默认为 1000 毫秒。

showExpandedEvents()

从MongoDB Server v6.0 开始, 变更流支持数据定义语言 (DDL) 事件的变更通知,例如 createIndexesdropIndexes 事件。 要在变更流中包含扩展事件,请调用此方法并传入值 true

batchSize()

指定来自MongoDB 集群的每批批处理中要返回的变更事件的最大数量。默认下,驾驶员会将此值设置为 Long.MAX_VALUE

batchSize0 表示将建立游标,但第批处理不会返回任何文档。

collation()

指定用于变更流游标的排序规则。

comment()

为操作附加注释。

重要

仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。

默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请将fullDocumentBeforeChange()fullDocument()方法链接到watch()方法。

前像是文档在更改之前的完整版本。 要将前像包含在变更流事件中,请将以下值之一传递给fullDocumentBeforeChange()方法:

  • FullDocumentBeforeChange.WHEN_AVAILABLE:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。

  • FullDocumentBeforeChange.REQUIRED:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。

后像是文档更改的完整版本。 要将后图像包含在变更流事件中,请将以下值之一传递给fullDocument()方法:

  • FullDocument.UPDATE_LOOKUP:更改事件包括更改后某个时间点的整个已更改文档的副本。

  • FullDocument.WHEN_AVAILABLE:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。

  • FullDocument.REQUIRED:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。

以下示例在集合上打开变更流,并通过将fullDocument()方法链接到watch()方法来包含更新文档的后映像:

// Creates a change stream pipeline
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.eq("operationType", "update"))
);
// Opens a change stream and prints the changes as they're received including the full
// document after the update
ChangeStreamPublisher<Document> changeStreamPublisher = restaurants.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
Flux.from(changeStreamPublisher)
.doOnNext(change -> System.out.println("Received change: " + change))
.blockLast();

要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。

要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: