对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

监控数据变化

在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是MongoDB Server的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。

提示

Atlas Stream Processing

作为变更流的替代方案,您可以使用Atlas Stream Processing来处理和转换数据流。与仅注册数据库事件的变更流不同,Atlas Stream Processing托管多种数据事件类型并提供扩展的数据处理功能。要学习;了解有关此功能的更多信息,请参阅MongoDB Atlas文档中的Atlas Stream Processing

本指南中的示例使用Atlas示例数据集中的 sample_restaurants.restaurants集合。要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅.NET/ C#驱动程序入门

本页的示例使用以下 RestaurantAddressGradeEntry 类作为模型:

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

注意

restaurants集合中的文档使用蛇形命名规则。本指南中的示例使用 ConventionPack 将集合中的字段反序列化为 Pascal 语句,并将它们映射到 Restaurant 类中的属性。

如需了解有关自定义序列化的更多信息,请参阅“自定义序列化”。

要打开变更流,请调用Watch()WatchAsync()方法。 调用该方法的实例决定了变更流侦听的事件范围。 您可以对以下类调用Watch()WatchAsync()方法:

  • MongoClient:监控 MongoDB 部署中的所有更改

  • Database:监控数据库中所有集合的变更

  • Collection:监控集合中的更改

以下示例在restaurants集合上打开变更流,并在发生变更时输出变更。 选择SynchronousAsynchronous标签页以查看相应的代码。

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});

要开始监视更改,请运行应用程序。 然后,在单独的应用程序或shell中,修改 restaurants集合。 更新"name"值为"Blarney Castle"的文档会产生以下变更流输出:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

您可以将pipeline参数传递给Watch()WatchAsync()方法,以修改变更流输出。 此参数允许您仅监视指定的更改事件。 使用EmptyPipelineDefinition类并附加相关聚合阶段方法来创建管道。

您可以在pipeline参数中指定以下聚合阶段:

  • $addFields

  • $changeStreamSplitLargeEvent

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

提示

要学习;了解如何使用 PipelineDefinitionBuilder 类构建聚合管道,请参阅 Operations with Builders指南中的聚合管道阶段。

要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。

以下示例使用pipeline参数打开仅记录更新操作的变更流。 选择SynchronousAsynchronous标签页以查看相应的代码。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = collection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await collection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}

如果应用应用程序生成大小超过 16 MB 的变更事件,服务器将返回 BSONObjectTooLarge 错误。 为避免此错误,您可以使用 $changeStreamSplitLargeEvent管道阶段将事件分割为较小的片段。 .NET/ C#驱动程序聚合API包括 ChangeStreamSplitLargeEvent() 方法,您可以使用该方法将 $changeStreamSplitLargeEvent 阶段添加到变更流管道。

此示例指示驱动程序监视超过 16 MB 限制的更改和分割更改事件。该代码会打印每个事件的变更文档,并调用辅助工具来重新组合所有事件片段:

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}

注意

我们建议重新组合变更事件片段,如前面的示例所示,但这一步是可选的。 您可以使用相同的逻辑来监视分割和完成变更事件。

前面的示例使用 GetNextChangeStreamEvent()GetNextChangeStreamEventAsync()MergeFragment() 方法将变更事件片段重新组合成单个变更流文档。 以下代码定义了这些方法:

// Fetches the next complete change stream event
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// Fetches the next complete change stream event
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}

提示

要学习;了解有关拆分大型变更事件的更多信息,请参阅MongoDB Server手册中的 $changeStreamSplitLargeEvent。

Watch()WatchAsync() 方法接受可选参数,这些参数表示可用于配置操作的选项。如果不指定任何选项,驱动程序不会自定义操作。

下表描述了可以设立用于自定义Watch()WatchAsync()行为的选项:

选项
说明

FullDocument

指定是否显示更改后的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像

FullDocumentBeforeChange

指定是否显示更改前的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像

ResumeAfter

指示Watch()WatchAsync() 在恢复令牌中指定的操作后恢复返回更改。每个变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id 字段,表示之后要恢复的操作。
ResumeAfter StartAfter与 和StartAtOperationTime 互斥。

StartAfter

指示Watch()WatchAsync() 在恢复令牌中指定的操作后启动新的变更流。允许在无效事件后恢复通知。每个变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id 字段,表示之后要恢复的操作。
StartAfter ResumeAfter与 和StartAtOperationTime 互斥。

StartAtOperationTime

指示Watch()WatchAsync() 仅返回在指定时间戳之后发生的事件。
StartAtOperationTime ResumeAfter与 和StartAfter 互斥。

MaxAwaitTime

指定服务器在返回空批处理之前等待新数据更改报告给变更流游标的最长时间(以毫秒为单位)。 默认为 1000 毫秒。

ShowExpandedEvents

从 MongoDB Server v 6.0开始, 变更流支持数据定义语言 (DDL) 事件的变更通知,例如createIndexesdropIndexes事件。 要在变更流中包含扩展事件,请创建变更流游标并将此参数设置为True

batchSize

指定变更流在每个批处理中可以返回的最大文档数,这适用于 Watch()WatchAsync()。如果未设立batchSize 选项,则监视函数的初始批处理大小为 101 个文档,后续每个批处理的最大大小为 16 兆字节 (MiB)。此选项可以实施小于 16 MiB 的限制,但不能执行大于 MiB 的限制。如果您将 batchSize设立为导致批处理大于 16 MiB 的限制,则该选项无效,并且 Watch()WatchAsync() 使用默认批处理大小。

Collation

指定要用于变更流游标的排序规则。有关更多信息,请参阅此页面的排序规则部分。

Comment

为操作附加注释。

要为操作配置排序规则,请创建 Collation 类的实例。

下表描述了 Collation 构造函数接受的参数。它还列出了相应的类属性,您可以使用这些属性读取每个设置的值。

Parameter
说明
类属性

locale

指定 Unicode 国际组件 (ICU)区域设置设置。有关支持的区域设置列表,请参阅MongoDB Server手册中的排序规则区域设置和默认参数。如果要使用简单的二进制比较,请使用

Collation.Simple静态属性返回Collation 对象,并将locale 设立为"simple"
。数据类型:string

Locale

caseLevel

(可选)指定是否包括大小写比较。当此参数为

true时,驱动程序的行为取决于strength 参数的值:

strengthCollationStrength.Primary
- 如果strengthCollationStrength.Secondary ,驾驶员会比较基本字符和大小写。
— 如果 为 ,则驾驶员会比较基本字符、变音符号、其他从节点(secondary node from replica set)差异和大小写。 — 如果 为任何其他值,则忽略此参数。当此参数为strength

false时,驾驶员不包括强度级别PrimarySecondary

的大小写比较。数据类型:boolean
默认值:false

CaseLevel

caseFirst

CaseFirst

strength

(可选)指定要执行的比较级别,如 ICU

文档中所定义。数据类型:CollationStrength
默认值:CollationStrength.Tertiary

Strength

numericOrdering

(可选)指定驾驶员是否将数字字符串作为数字进行比较。如果此参数为

true,则驾驶员将数字字符串作为数字进行比较。示例,在比较字符串10 210210

false"10 " 和 "2 " 时,驾驶员会将这些值视为 和 ,并发现 更大。如果此参数为 或已排除,则驾驶员会将数字字符串作为字符串进行比较。示例,在比较字符串1 "2 " 和10 "2 "

时,驾驶员一次比较一个字符。由于“”小于“”,因此驾驶员会发现“”小于“”。有关更多信息,请参阅MongoDB Server手册中的排序规则限制。数据类型:

boolean
默认值:false

NumericOrdering

alternate



(可选)指定驾驶员是否将空格和标点符号视为基本字符以进行比较。数据类型:CollationAlternate
默认值:CollationAlternate.NonIgnorable (空格和标点符号被视为基本字符)

Alternate

maxVariable

(可选)指定当 alternate参数为CollationAlternate.Shifted

时驾驶员认为哪些字符可忽略。数据类型:CollationMaxVariable
默认值:CollationMaxVariable.Punctuation (驾驶员忽略标点符号和空格)

MaxVariable

normalization



(可选)指定驾驶员是否根据需要对文本进行规范化。大多数文本不需要规范化。有关规范化的更多信息,请参阅 ICU 文档。数据类型:

boolean
默认值:false

Normalization

backwards

(可选)指定包含变音符号的字符串是否从后往前排序。数据类型:

boolean
默认值:false

Backwards

有关排序规则的更多信息,请参阅MongoDB Server手册中的排序规则页面。

重要

仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。

默认,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请创建一个ChangeStreamOptions对象并指定FullDocumentBeforeChangeFullDocument选项。 然后,将ChangeStreamOptions对象传递给Watch()WatchAsync()方法。

前像是文档在更改之前的完整版本。 要将前像包含在变更流事件,请将FullDocumentBeforeChange选项设立为以下值之一:

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。

  • ChangeStreamFullDocumentBeforeChangeOption.Required:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。

后像是文档更改的完整版本。 要将后图像包含在变更流事件,请将FullDocument选项设立为以下值之一:

  • ChangeStreamFullDocumentOption.UpdateLookup:更改事件包括更改后某个时间点的整个已更改文档的副本。

  • ChangeStreamFullDocumentOption.WhenAvailable:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。

  • ChangeStreamFullDocumentOption.Required:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。

以下示例在集合上打开变更流,并通过指定FullDocument选项包含更新文档的后映像。 选择SynchronousAsynchronous标签页以查看相应的代码。

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = collection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await collection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});

运行前面的代码示例并更新"name"值为"Blarney Castle"的文档会产生以下变更流输出:

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。

要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: