Overview
在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是MongoDB Server的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。
提示
Atlas Stream Processing
作为变更流的替代方案,您可以使用Atlas Stream Processing来处理和转换数据流。与仅注册数据库事件的变更流不同,Atlas Stream Processing托管多种数据事件类型并提供扩展的数据处理功能。要学习;了解有关此功能的更多信息,请参阅MongoDB Atlas文档中的Atlas Stream Processing 。
样本数据
本指南中的示例使用Atlas示例数据集中的 sample_restaurants.restaurants集合。要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅.NET/ C#驱动程序入门 。
本页的示例使用以下 Restaurant、Address 和 GradeEntry 类作为模型:
public class Restaurant { public ObjectId Id { get; set; } public string Name { get; set; } [] public string RestaurantId { get; set; } public string Cuisine { get; set; } public Address Address { get; set; } public string Borough { get; set; } public List<GradeEntry> Grades { get; set; } }
public class Address { public string Building { get; set; } [] public double[] Coordinates { get; set; } public string Street { get; set; } [] public string ZipCode { get; set; } }
public class GradeEntry { public DateTime Date { get; set; } public string Grade { get; set; } public float? Score { get; set; } }
注意
restaurants集合中的文档使用蛇形命名规则。本指南中的示例使用 ConventionPack 将集合中的字段反序列化为 Pascal 语句,并将它们映射到 Restaurant 类中的属性。
如需了解有关自定义序列化的更多信息,请参阅“自定义序列化”。
打开变更流
要打开变更流,请调用Watch()或WatchAsync()方法。 调用该方法的实例决定了变更流侦听的事件范围。 您可以对以下类调用Watch()或WatchAsync()方法:
MongoClient:监控 MongoDB 部署中的所有更改Database:监控数据库中所有集合的变更Collection:监控集合中的更改
以下示例在restaurants集合上打开变更流,并在发生变更时输出变更。 选择Synchronous或Asynchronous标签页以查看相应的代码。
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change stream and prints the changes as they're received using (var cursor = collection.Watch()) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following type of change: " + change.BackingDocument); } }
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change streams and print the changes as they're received using var cursor = await collection.WatchAsync(); await cursor.ForEachAsync(change => { Console.WriteLine("Received the following type of change: " + change.BackingDocument); });
要开始监视更改,请运行应用程序。 然后,在单独的应用程序或shell中,修改 restaurants集合。 更新"name"值为"Blarney Castle"的文档会产生以下变更流输出:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...), "wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [], "truncatedArrays" : [] } }
修改变更流输出
您可以将pipeline参数传递给Watch()和WatchAsync()方法,以修改变更流输出。 此参数允许您仅监视指定的更改事件。 使用EmptyPipelineDefinition类并附加相关聚合阶段方法来创建管道。
您可以在pipeline参数中指定以下聚合阶段:
$addFields$changeStreamSplitLargeEvent$match$project$replaceRoot$replaceWith$redact$set$unset
提示
要学习;了解如何使用 PipelineDefinitionBuilder 类构建聚合管道,请参阅 Operations with Builders指南中的聚合管道阶段。
要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。
监控更新事件示例
以下示例使用pipeline参数打开仅记录更新操作的变更流。 选择Synchronous或Asynchronous标签页以查看相应的代码。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received using (var cursor = collection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following change: " + change); } }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received using (var cursor = await collection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { Console.WriteLine("Received the following change: " + change); }); }
分割大型更改事件示例
如果应用应用程序生成大小超过 16 MB 的变更事件,服务器将返回 BSONObjectTooLarge 错误。 为避免此错误,您可以使用 $changeStreamSplitLargeEvent管道阶段将事件分割为较小的片段。 .NET/ C#驱动程序聚合API包括 ChangeStreamSplitLargeEvent() 方法,您可以使用该方法将 $changeStreamSplitLargeEvent 阶段添加到变更流管道。
此示例指示驱动程序监视超过 16 MB 限制的更改和分割更改事件。该代码会打印每个事件的变更文档,并调用辅助工具来重新组合所有事件片段:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .ChangeStreamSplitLargeEvent(); using var cursor = collection.Watch(pipeline); foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) { Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .ChangeStreamSplitLargeEvent(); using var cursor = await collection.WatchAsync(pipeline); await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) { Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); }
注意
我们建议重新组合变更事件片段,如前面的示例所示,但这一步是可选的。 您可以使用相同的逻辑来监视分割和完成变更事件。
前面的示例使用 GetNextChangeStreamEvent()、GetNextChangeStreamEventAsync() 和 MergeFragment() 方法将变更事件片段重新组合成单个变更流文档。 以下代码定义了这些方法:
// Fetches the next complete change stream event private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>( IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator) { while (changeStreamEnumerator.MoveNext()) { var changeStreamEvent = changeStreamEnumerator.Current; if (changeStreamEvent.SplitEvent != null) { var fragment = changeStreamEvent; while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { changeStreamEnumerator.MoveNext(); fragment = changeStreamEnumerator.Current; MergeFragment(changeStreamEvent, fragment); } } yield return changeStreamEvent; } } // Merges a fragment into the base event private static void MergeFragment<TDocument>( ChangeStreamDocument<TDocument> changeStreamEvent, ChangeStreamDocument<TDocument> fragment) { foreach (var element in fragment.BackingDocument) { if (element.Name != "_id" && element.Name != "splitEvent") { changeStreamEvent.BackingDocument[element.Name] = element.Value; } } }
// Fetches the next complete change stream event private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>( IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) { var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator(); while (await changeStreamEnumerator.MoveNextAsync()) { var changeStreamEvent = changeStreamEnumerator.Current; if (changeStreamEvent.SplitEvent != null) { var fragment = changeStreamEvent; while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { await changeStreamEnumerator.MoveNextAsync(); fragment = changeStreamEnumerator.Current; MergeFragment(changeStreamEvent, fragment); } } yield return changeStreamEvent; } } private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>( IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) { while (await changeStreamCursor.MoveNextAsync()) { foreach (var changeStreamEvent in changeStreamCursor.Current) { yield return changeStreamEvent; } } } // Merges a fragment into the base event private static void MergeFragment<TDocument>( ChangeStreamDocument<TDocument> changeStreamEvent, ChangeStreamDocument<TDocument> fragment) { foreach (var element in fragment.BackingDocument) { if (element.Name != "_id" && element.Name != "splitEvent") { changeStreamEvent.BackingDocument[element.Name] = element.Value; } } }
提示
要学习;了解有关拆分大型变更事件的更多信息,请参阅MongoDB Server手册中的 $changeStreamSplitLargeEvent。
修改 Watch() 行为
Watch() 和 WatchAsync() 方法接受可选参数,这些参数表示可用于配置操作的选项。如果不指定任何选项,驱动程序不会自定义操作。
下表描述了可以设立用于自定义Watch()和WatchAsync()行为的选项:
选项 | 说明 |
|---|---|
| 指定是否显示更改后的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像。 |
| 指定是否显示更改前的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像。 |
| 指示 |
| 指示 |
| 指示 |
| 指定服务器在返回空批处理之前等待新数据更改报告给变更流游标的最长时间(以毫秒为单位)。 默认为 1000 毫秒。 |
| 从 MongoDB Server v 6.0开始, 变更流支持数据定义语言 (DDL) 事件的变更通知,例如 |
| 指定变更流在每个批处理中可以返回的最大文档数,这适用于 |
| 指定要用于变更流游标的排序规则。有关更多信息,请参阅此页面的排序规则部分。 |
| 为操作附加注释。 |
排序规则
要为操作配置排序规则,请创建 Collation 类的实例。
下表描述了 Collation 构造函数接受的参数。它还列出了相应的类属性,您可以使用这些属性读取每个设置的值。
Parameter | 说明 | 类属性 |
|---|---|---|
|
| |
| (可选)指定是否包括大小写比较。当此参数为 |
|
|
|
|
| (可选)指定要执行的比较级别,如 ICU |
|
|
| |
|
|
|
| (可选)指定当 |
|
|
|
|
| (可选)指定包含变音符号的字符串是否从后往前排序。数据类型: |
|
有关排序规则的更多信息,请参阅MongoDB Server手册中的排序规则页面。
包含前像和后像
重要
仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。
默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请创建一个ChangeStreamOptions对象并指定FullDocumentBeforeChange或FullDocument选项。 然后,将ChangeStreamOptions对象传递给Watch()或WatchAsync()方法。
前像是文档在更改之前的完整版本。 要将前像包含在变更流事件,请将FullDocumentBeforeChange选项设立为以下值之一:
ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。ChangeStreamFullDocumentBeforeChangeOption.Required:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。
后像是文档更改后的完整版本。 要将后图像包含在变更流事件,请将FullDocument选项设立为以下值之一:
ChangeStreamFullDocumentOption.UpdateLookup:更改事件包括更改后某个时间点的整个已更改文档的副本。ChangeStreamFullDocumentOption.WhenAvailable:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。ChangeStreamFullDocumentOption.Required:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。
以下示例在集合上打开变更流,并通过指定FullDocument选项包含更新文档的后映像。 选择Synchronous或Asynchronous标签页以查看相应的代码。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using (var cursor = collection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine(change.FullDocument.ToBsonDocument()); } }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using var cursor = await collection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument()); });
运行前面的代码示例并更新"name"值为"Blarney Castle"的文档会产生以下变更流输出:
{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356", "cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }
要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。
更多信息
要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: