Docs 菜单
Docs 主页
/ /

打开变更流

在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。

您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。

通过以下部分了解如何打开和配置change stream:

  • 打开变更流

  • 将聚合操作符应用于change stream

  • 包含前像和后像

提示

Atlas Stream Processing

作为变更流的替代方案,您可以使用Atlas Stream Processing来处理和转换数据流。与仅注册数据库事件的变更流不同,Atlas Stream Processing托管多种数据事件类型并提供扩展的数据处理功能。要学习;了解有关此功能的更多信息,请参阅MongoDB Atlas文档中的Atlas Stream Processing

您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。

要打开change stream,请在实例上调用watch() MongoCollectionMongoDatabaseMongoClient方法。

重要

独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。

您对其调用 watch() 方法的对象决定了变更流侦听的事件范围:

  • MongoCollection.watch() 监控集合。

  • MongoDatabase.watch() 监控数据库中的所有集合。

  • MongoClient.watch() 监控已连接MongoDB 部署中的所有更改。

watch() 方法将可选的聚合管道作为第一个参数,其中包含可用于过滤和转换变更事件输出的阶段列表,如下所示:

List<Bson> pipeline = List.of(
Aggregates.match(
Filters.in("operationType",
List.of("insert", "update"))),
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

注意

对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP调用ChangeStreamIterable对象的fullDocument()成员方法,如下所示:

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

watch() 方法返回一个 ChangeStreamIterable实例,该接口提供多种访问权限、组织和遍历结果的方法。ChangeStreamIterable 还从其父接口 MongoIterable 继承方法,该接口实施核心Java接口 Iterable

您可以对 ChangeStreamIterable 调用 forEach(),从而在事件发生时对其进行处理,也可以使用 iterator() 方法返回可用于遍历结果的 MongoChangeStreamCursor 实例。

您可以在 MongoChangeStreamCursor实例上调用以下方法:

  • hasNext():检查是否有更多结果

  • next():返回集合中的下一个文档

  • tryNext():立即返回变更流中的下一个可用元素,或者 null

重要

迭代游标会阻塞当前线程

使用 forEach() 或任何 iterator() 方法遍历游标会阻止当前线程,而相应的变更流会侦听事件。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在单独的线程中创建和侦听变更流。

与其他查询返回的 MongoCursor 不同,与变更流关联的 MongoChangeStreamCursor 会等到变更事件到达,然后再从 next() 返回结果。因此,使用变更流的 MongoChangeStreamCursor 调用 next() 永远不会引发 java.util.NoSuchElementException

要配置用于处理从变更流返回的文档的选项,请使用 watch() 返回的 ChangeStreamIterable 对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable API 文档链接。

myColl此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。

驱动程序将 change stream 事件存储在类型为ChangeStreamIterable的变量中。在以下示例中,我们指定驱动程序应使用Document类型填充ChangeStreamIterable对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument对象。

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch();
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

对collection执行插入操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=insert,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

注意

设置示例

This example connects to an instance of MongoDB by using a connection URI. To learn more about connecting to your MongoDB instance, see the Create a MongoClient guide. This example also uses the movies collection in the sample_mflix database included in the Atlas sample datasets. You can load them into your database on the free tier of MongoDB Atlas by following the MongoDB Get Started.

此示例演示了如何使用 watch 方法打开变更流。Watch.java文件使用管道作为参数调用 watch() 方法,以仅过滤"insert""update" 事件。WatchCompanion.java文件插入、更新和删除文档。

要使用以下示例,请按以下顺序运行文件:

  1. 运行 Watch.java 文件。

  2. 运行 WatchCompanion.java 文件。

注意

Watch.java文件将继续运行,直到 WatchCompanion.java文件运行。

Watch.java:

/**
* This file demonstrates how to open a change stream by using the Java driver.
* It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens
* to change events in the "movies" collection. The code uses a change stream with a pipeline
* to only filter for "insert" and "update" events.
*/
package org.example;
import java.util.Arrays;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.Aggregates;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
// Creates instructions to match insert and update operations
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
// Creates a change stream that receives change events for the specified operations
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
final int[] numberOfEvents = {0};
// Prints a message each time the change stream receives a change event, until it receives two events
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion.java:

// Performs CRUD operations to generate change events when run with the Watch application
package org.example;
import org.bson.Document;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.client.result.DeleteResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
// Inserts a sample document into the "movies" collection and print its ID
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Inserted document id: " + insertResult.getInsertedId());
// Updates the sample document and prints the number of modified documents
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
// Deletes the sample document and prints the number of deleted documents
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
// Prints a message if any exceptions occur during the operations
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

上述应用程序将生成以下输出:

Watch.java 将仅捕获 insertupdate 操作,因为聚合管道会过滤掉 delete 操作:

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

WatchCompanion 将打印其完成的操作摘要:

Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

要了解有关watch()方法的详情,请参阅以下 API 文档:

您可以将聚合管道作为参数传递给watch()方法,以指定change stream接收哪些事件。

要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。

以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:

MongoCollection<Document> collection = database.getCollection("myColl");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = collection.watch(pipeline);
changeStream.forEach(event ->
System.out.println("Received a change to the collection: " + event));

对集合执行更新操作会产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
...
}

从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent聚合阶段将超过 16 MB 的事件分割成更小的片段。

仅在绝对必要时使用$changeStreamSplitLargeEvent 。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent

$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent每个片段都包括一个包含以下字段的对象:

字段
说明

fragment

片段的索引,起始于 1

of

组成分割事件的分片总数

以下示例通过使用$changeStreamSplitLargeEvent聚合阶段分割大型事件来修改change stream:

ChangeStreamIterable<Document> changeStream = collection.watch(
List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));

注意

聚合管道中只能有一个$changeStreamSplitLargeEvent阶段,并且它必须是管道中的最后一个阶段。

您可以在change stream游标上调用getSplitEvent()方法来访问SplitEvent ,如以下示例所示:

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor();
SplitEvent event = cursor.tryNext().getSplitEvent();

有关$changeStreamSplitLargeEvent聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。

您可以配置变更事件以包含或省略以下数据:

  • 前像,表示操作前文档版本的文档(如果存在)

  • 帖子-图像,表示操作后文档版本的文档(如果存在)

重要

仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。

要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:

  • 为 MongoDB 部署中的collection启用前像和帖子。

    提示

    要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流

    要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。

  • 配置你的 change stream 以检索前映像和后映像中的一个或两个。

    提示

    要配置变更流以记录变更事件中的前像,请参阅前像配置示例。

    要配置变更流以记录变更事件中的后映像,请参阅后映像配置示例。

要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions createCollection()方法,如以下示例所示:

CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
database.createCollection("myColl", collectionOptions);

您可以通过从 MongoDB Shell 运行collMod命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。

警告

如果在collection上启用了前映像或帖子,则使用collMod修改这些设置可能会导致该collection上的现有change stream失败。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。

假设您将文档中amount字段的值从150更新为2000 。 此变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=null,
fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}},
...
}

有关选项列表,请参阅 FullDocumentBeforeChange API文档。

以下代码示例展示如何在myColl collection 上配置 change stream 以包含前映像并输出任何事件:

MongoCollection<Document> collection = database.getCollection("myColl");
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.forEach(event ->
System.out.println("Received a change: " + event));

前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。

假设您将文档中color字段的值从"purple"更新为"pink" 。 变更事件产生以下输出:

Received a change: ChangeStreamDocument{
operationType=update,
resumeToken={"_data": "..."},
namespace=myDb.myColl,
destinationNamespace=null,
fullDocument=Document{{_id=..., color=purple, ...}},
updatedFields={"color": purple},
...
}

有关选项列表,请参阅 FullDocument API文档。

有关用于管理变更流的方法和类的更多信息,请参阅以下API文档:

后退

监控

在此页面上