打开变更流
在此页面上
Overview
在本指南中,您可以了解如何使用change stream来监控数据库的实时更改。change stream 是 MongoDB Server 的一项功能,允许应用程序订阅单个 collection、数据库或部署上的数据更改。
您可以指定一组聚合操作符来筛选和转换应用程序接收的数据。 连接到 MongoDB 部署 v6.0 或更高版本时,您还可以配置事件以包含更改之前和之后的文档数据。
通过以下部分了解如何打开和配置change stream:
打开变更流
您可以打开变更流来订阅特定类型的数据变更,并在应用程序中生成变更事件。
选择要观察的范围
要打开change stream,请在实例上调用watch()
MongoCollection
MongoDatabase
MongoClient
方法。
重要
独立运行的 MongoDB 部署不支持变更流,因为该功能需要副本集 oplog。 要了解有关oplog的更多信息,请参阅副本集oplog MongoDB Server手册页面。
您对其调用 watch()
方法的对象决定了变更流侦听的事件范围:
MongoCollection.watch()
监控集合。MongoDatabase.watch()
监控数据库中的所有集合。MongoClient.watch()
监控已连接MongoDB 部署中的所有更改。
筛选事件
watch()
方法将可选的聚合管道作为第一个参数,其中包含可用于过滤和转换变更事件输出的阶段列表,如下所示:
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
注意
对于更新操作变更事件,变更流默认仅返回修改的字段,而不是整个更新的文档。您可以将更改流配置为返回文档的最新版本,方法是使用值FullDocument.UPDATE_LOOKUP
调用ChangeStreamIterable
对象的fullDocument()
成员方法,如下所示:
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
管理输出
watch()
方法返回一个 ChangeStreamIterable
实例,该接口提供多种访问权限、组织和遍历结果的方法。 ChangeStreamIterable
还从其父接口 MongoIterable
继承方法,该接口实施核心Java接口 Iterable
。
您可以对 ChangeStreamIterable
调用 forEach()
,从而在事件发生时对其进行处理,也可以使用 iterator()
方法返回可用于遍历结果的 MongoChangeStreamCursor
实例。
您可以在 MongoChangeStreamCursor
实例上调用以下方法:
hasNext()
:检查是否有更多结果next()
:返回集合中的下一个文档tryNext()
:立即返回变更流中的下一个可用元素,或者null
重要
迭代游标会阻塞当前线程
使用 forEach()
或任何 iterator()
方法遍历游标会阻止当前线程,而相应的变更流会侦听事件。如果您的程序需要继续执行其他逻辑,例如处理请求或响应用户输入,请考虑在单独的线程中创建和侦听变更流。
与其他查询返回的 MongoCursor
不同,与变更流关联的 MongoChangeStreamCursor
会等到变更事件到达,然后再从 next()
返回结果。因此,使用变更流的 MongoChangeStreamCursor
调用 next()
永远不会引发 java.util.NoSuchElementException
。
要配置用于处理从变更流返回的文档的选项,请使用 watch()
返回的 ChangeStreamIterable
对象的成员方法。有关可用方法的更多详细信息,请参阅本示例底部的 ChangeStreamIterable
API 文档链接。
例子
myColl
此示例展示了如何在collection上打开change stream,并在events发生时打印这些事件。
驱动程序将 change stream 事件存储在类型为ChangeStreamIterable
的变量中。在以下示例中,我们指定驱动程序应使用Document
类型填充ChangeStreamIterable
对象。 因此,驱动程序会将各个change stream事件存储为ChangeStreamDocument
对象。
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
对collection执行插入操作会产生以下输出:
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
观看示例:完整文件
注意
设置示例
此示例使用连接 URI 连接到MongoDB实例。要学习;了解有关连接到MongoDB实例的更多信息,请参阅 创建 MongoClient指南。此示例还使用Atlas示例数据集包含的 movies
sample_mflix
数据库中的 集合。您可以按照Atlas入门指南,将它们加载到MongoDB Atlas免费套餐上的数据库中。
此示例演示了如何使用 watch 方法打开变更流。 Watch.java
文件使用管道作为参数调用 watch()
方法,以仅过滤"insert"
和 "update"
事件。 WatchCompanion.java
文件插入、更新和删除文档。
要使用以下示例,请按以下顺序运行文件:
运行
Watch.java
文件。运行
WatchCompanion.java
文件。
注意
Watch.java
文件将继续运行,直到 WatchCompanion.java
文件运行。
Watch.java
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java
:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
完整文件示例输出
上述应用程序将生成以下输出:
Watch.java
将仅捕获 insert
和 update
操作,因为聚合管道会过滤掉 delete
操作:
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion
将打印其完成的操作摘要:
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
要了解有关watch()
方法的详情,请参阅以下 API 文档:
将聚合操作符应用于change stream
您可以将聚合管道作为参数传递给watch()
方法,以指定change stream接收哪些事件。
要了解您的 MongoDB Server 版本支持哪些聚合操作符,请参阅修改变更流输出。
例子
以下代码示例展示了如何应用聚合管道来配置change stream,从而仅接收插入和更新操作的事件:
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
对集合执行更新操作会产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
分割大型change stream事件
从 MongoDB 7.0 开始,您可以使用$changeStreamSplitLargeEvent
聚合阶段将超过 16 MB 的事件分割成更小的片段。
仅在绝对必要时使用$changeStreamSplitLargeEvent
。 例如,如果您的应用程序需要完整的文档前图像或帖子图像,并生成超过 16 MB 的事件,请使用$changeStreamSplitLargeEvent
。
$changeStreamSplitLargeEvent 阶段按顺序返回片段。 您可以使用change stream游标访问这些片段。SplitEvent
每个片段都包括一个包含以下字段的对象:
字段 | 说明 |
---|---|
| 片段的索引,起始于 |
| 组成分割事件的分片总数 |
以下示例通过使用$changeStreamSplitLargeEvent
聚合阶段分割大型事件来修改change stream:
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
注意
聚合管道中只能有一个$changeStreamSplitLargeEvent
阶段,并且它必须是管道中的最后一个阶段。
您可以在change stream游标上调用getSplitEvent()
方法来访问SplitEvent
,如以下示例所示:
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
有关$changeStreamSplitLargeEvent
聚合阶段的更多信息,请参阅$changeStreamSplitLargeEvent服务器文档。
包含前像和后像
您可以配置变更事件以包含或省略以下数据:
前像,表示操作前文档版本的文档(如果存在)
帖子-图像,表示操作后文档版本的文档(如果存在)
重要
仅当您的部署使用 MongoDB v6.0 或更高版本时,才能对collection启用前像和帖子。
要接收包含前映像或帖子映像的change stream事件,您必须执行以下操作:
为 MongoDB 部署中的collection启用前像和帖子。
提示
要了解如何在部署上启用前像和后像,请参阅服务器手册中的带文档前像和后像的变更流。
要了解如何指示驱动程序创建启用了前图像和后图像的集合,请参阅创建启用了前图像和后图像的集合部分。
配置你的 change stream 以检索前映像和后映像中的一个或两个。
创建启用前像和后像的集合
要使用驱动程序创建启用了前图像和帖子图像选项的collection,请指定instance的实例并调用ChangeStreamPreAndPostImagesOptions
createCollection()
方法,如以下示例所示:
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
您可以通过从 MongoDB Shell 运行collMod
命令来更改现有集合中的前图像和后图像选项。 要了解如何执行此操作,请参阅 手册中有关 collMod MongoDB Server的条目。
警告
如果在collection上启用了前映像或帖子,则使用collMod
修改这些设置可能会导致该collection上的现有change stream失败。
前像配置示例
以下代码示例展示如何在myColl
collection 上配置 change stream 以包含前映像并输出任何事件:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
前面的示例将change stream配置为使用FullDocumentBeforeChange.REQUIRED
选项。此选项将 change stream 配置为需要预映像来替换、更新和删除事件。如果前像不可用,则驱动程序会引发错误。
假设您将文档中amount
字段的值从150
更新为2000
。 此变更事件产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
有关选项列表,请参阅 FullDocumentBeforeChange API 文档。
帖子图像配置示例
以下代码示例展示如何在myColl
collection 上配置 change stream 以包含前映像并输出任何事件:
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
前面的示例将change stream配置为使用FullDocument.WHEN_AVAILABLE
选项。此选项将change stream配置为为替换和更新事件返回已修改文档的帖子(如果可用)。
假设您将文档中color
字段的值从"purple"
更新为"pink"
。 变更事件产生以下输出:
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
有关选项列表,请参阅 FullDocument API 文档。
更多信息
API 文档
有关用于管理变更流的方法和类的更多信息,请参阅以下API文档: