Docs 菜单
Docs 主页
/ / /
Java (Sync) 驱动程序
/

打开变更流

在此页面上

  • Overview
  • 打开变更流
  • 选择要观察的范围
  • 筛选事件
  • 管理输出
  • 例子
  • 观看示例:完整文件
  • 完整文件示例输出
  • 将聚合操作符应用于change stream
  • 例子
  • 分割大型change stream事件
  • 包含前像和后像
  • 创建启用前像和后像的集合
  • 前像配置示例
  • 帖子图像配置示例
  • 更多信息
  • API 文档
  • 服务器手册条目

在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。

您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。

通过以下部分了解如何打开和配置change stream:

  • 打开变更流

  • 将聚合操作符应用于change stream

  • 包含前像和后像

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在实例上调用watch() MongoCollectionMongoDatabaseMongoClient方法。

重要

独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。

您对其调用 watch() 方法的对象决定了变更流侦听的事件范围:

  • MongoCollection.watch() 监控集合。

  • MongoDatabase.watch() 监控数据库中的所有集合。

  • MongoClient.watch() 监控已连接MongoDB 部署中的所有更改。

watch()方法将可选的聚合管道作为第一个参数,其中包含可用于过滤和转换变更事件输出的阶段列表,如下所示:

List<Bson> pipeline = List.of(
Aggregates.match(
Filters.in("operationType",
List.of("insert", "update"))),
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

注意

对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP调用ChangeStreamIterable对象的fullDocument()成员方法,如下所示:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

watch() 方法返回一个 ChangeStreamIterable实例,该接口提供多种访问权限、组织和遍历结果的方法。 ChangeStreamIterable 还从其父接口 MongoIterable 继承方法,该接口实施核心Java接口 Iterable

您可以对 ChangeStreamIterable 调用 forEach(),从而在事件发生时对其进行处理,也可以使用 iterator() 方法返回可用于遍历结果的 MongoChangeStreamCursor 实例。

您可以在 MongoChangeStreamCursor实例上调用以下方法:

  • hasNext():检查是否有更多结果

  • next():返回集合中的下一个文档

  • tryNext():立即返回变更流中的下一个可用元素,或者 null

重要

迭代游标会阻塞当前线程

使用 forEach() 或任何 iterator() 方法遍历游标会阻止当前线程,而相应的变更流会侦听事件。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在单独的线程中创建和侦听变更流。

与其他查询返回的 MongoCursor 不同,与变更流关联的 MongoChangeStreamCursor 会等到变更事件到达,然后再从 next() 返回结果。因此,使用变更流的 MongoChangeStreamCursor 调用 next() 永远不会引发 java.util.NoSuchElementException

要配置用于处理从变更流返回的文档的选项,请使用 watch() 返回的 ChangeStreamIterable 对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable API 文档链接。

myColl此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。

驱动程序将 change stream 事件存储在类型为ChangeStreamIterable的变量中。在以下示例中,我们指定驱动程序应使用Document类型填充ChangeStreamIterable对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument对象。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

对collection执行插入操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

注意

设置示例

此示例使用连接 URI 连接到MongoDB实例。要学习;了解有关连接到MongoDB实例的更多信息,请参阅 创建 MongoClient指南。此示例还使用Atlas示例数据集包含的 moviessample_mflix数据库中的 集合。您可以按照Atlas入门指南,将它们加载到MongoDB Atlas免费套餐上的数据库中。

此示例演示了如何使用 watch 方法打开变更流。 Watch.java文件使用管道作为参数调用 watch() 方法,以仅过滤"insert""update" 事件。 WatchCompanion.java文件插入、更新和删除文档。

要使用以下示例,请按以下顺序运行文件:

  1. 运行 Watch.java 文件。

  2. 运行 WatchCompanion.java 文件。

注意

Watch.java文件将继续运行,直到 WatchCompanion.java文件运行。

Watch.java:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package org.example;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.Aggregates;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion.java:

// Performs CRUD operations to generate change events when run with the Watch application
package org.example;
import org.bson.Document;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.result.DeleteResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

上述应用程序将生成以下输出:

Watch.java 将仅捕获 insertupdate 操作,因为聚合管道会过滤掉 delete 操作:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

WatchCompanion 将打印其完成的操作摘要:

Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

要了解有关watch()方法的详情,请参阅以下 API 文档:

您可以将聚合管道作为参数传递给watch()方法,以指定change stream接收哪些事件。

要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。

以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

对集合执行更新操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent聚合阶段将超过 16 MB 的事件分割成更小的片段。

仅在绝对必要时使用$changeStreamSplitLargeEvent 。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent

$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent每个片段都包括一个包含以下字段的对象:

字段
说明

fragment

片段的索引,起始于 1

of

组成分割事件的分片总数

以下示例通过使用$changeStreamSplitLargeEvent聚合阶段分割大型事件来修改change stream:

ChangeStreamIterable<Document> changeStream = collection.watch(
List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

注意

聚合管道中只能有一个$changeStreamSplitLargeEvent阶段,并且它必须是管道中的最后一个阶段。

您可以在change stream游标上调用getSplitEvent()方法来访问SplitEvent ,如以下示例所示:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

有关$changeStreamSplitLargeEvent聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。

您可以配置变更事件以包含或省略以下数据:

  • 前像,表示操作前文档版本的文档(如果存在)

  • 帖子-图像,表示操作后文档版本的文档(如果存在)

重要

仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。

要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。

  • 配置你的 change stream 以检索前映像和后映像中的一个或两个。

    提示

    要配置变更流以记录变更事件中的前像,请参阅前像配置示例。

    要配置变更流以记录变更事件中的后映像,请参阅后映像配置示例。

要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions createCollection()方法,如以下示例所示:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

您可以通过从 MongoDB Shell 运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。

警告

如果在collection上启用了前映像或帖子,则使用collMod修改这些设置可能会导致该collection上的现有change stream失败。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。

假设您将文档中amount字段的值从150更新为2000 。 此变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

有关选项列表,请参阅 FullDocumentBeforeChange API 文档。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。

假设您将文档中color字段的值从"purple"更新为"pink" 。 变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

有关选项列表,请参阅 FullDocument API 文档。

有关用于管理变更流的方法和类的更多信息,请参阅以下API文档:

后退

监控