对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

监控数据变化

在本指南中,您可以学习如何使用C驱动程序监控变更流,以便查看数据的实时更改。变更流是MongoDB Server 的一项功能,用于发布有关集合、数据库或部署的数据更改。您的应用程序可以订阅变更流并使用事件来执行其他操作。

The examples in this guide use the restaurants collection in the sample_restaurants database from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the MongoDB Get Started guide.

要打开变更流,请调用以下与要观察的事件范围相对应的函数之一:

  • mongoc_client_watch():监控MongoDB 部署中的所有更改

  • mongoc_database_watch():监控数据库中所有集合的变更

  • mongoc_collection_watch():监控集合中的更改

以下示例在restaurants集合上打开变更流,并在发生变更时打印变更:

bson_t *pipeline = bson_new();
const bson_t *doc;
mongoc_change_stream_t *change_stream =
mongoc_collection_watch(collection, pipeline, NULL);
while (true) {
bson_error_t error;
if (mongoc_change_stream_next(change_stream, &doc)) {
char *str = bson_as_canonical_extended_json(doc, NULL);
printf("Received change: %s\n", str);
bson_free(str);
} else if (mongoc_change_stream_error_document(change_stream, &error, NULL)) {
printf("Got error on change stream: %s\n", error.message);
break;
}
}
bson_destroy(pipeline);
mongoc_change_stream_destroy(change_stream);

要开始监视更改,运行应用程序。 然后,在单独的应用程序或Shell中对 restaurants集合执行写入操作。 以下示例更新 name字段值为 "Blarney Castle" 的文档:

bson_t *filter = BCON_NEW("name", BCON_UTF8("Blarney Castle"));
bson_t *update = BCON_NEW("$set", "{", "cuisine", BCON_UTF8("Irish"), "}");
mongoc_collection_update_one(collection, filter, update, NULL, NULL, NULL);

更新集合时,变更流应用程序会在发生变更时打印变更。 打印的变更事件类似于以下内容:

{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
...
}

您可以将 pipeline 参数传递给任何监视函数,以修改变更流输出。 此参数允许您仅监视指定的更改事件。 将参数格式设置为对象列表,其中每个对象代表一个聚合阶段。

您可以在pipeline参数中指定以下阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

以下示例使用 pipeline 参数来包含 $match 阶段,以打开仅记录更新操作的变更流:

bson_t *pipeline = BCON_NEW(
"pipeline", "[",
"{", "$match", "{", "operationType", BCON_UTF8("update"), "}", "}",
"]");
const bson_t *doc;
mongoc_change_stream_t *change_stream =
mongoc_collection_watch(collection, pipeline, NULL);
while (mongoc_change_stream_next(change_stream, &doc)) {
char *str = bson_as_canonical_extended_json(doc, NULL);
printf("Received change: %s\n", str);
bson_free(str);
}
bson_destroy(pipeline);
mongoc_change_stream_destroy(change_stream);

要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。

您可以通过向函数调用传递选项来修改任何监视函数。如果不指定任何选项,驱动程序不会自定义操作。

下表描述了可用于自定义监视功能行为的选项:

选项
说明

batchSize

变更流在每个批处理中可返回的最大文档数量,这适用于 watch 函数。默认情况下,watch 函数的初始批处理大小为 101 个文档,后续批处理的最大大小为 16 梅比字节 (MiB)。此选项可以实施比 16 MiB 更小的限制,但不能实施比 MiB 更大的限制。有关详细信息,请参阅 MongoDB Server 手册中的游标批处理

comment

指定要附加到操作的注释。

fullDocument

设置 fullDocument 值。要了解更多信息,请参阅本文档的“包含前图和后图”部分。

fullDocumentBeforeChange

设置 fullDocumentBeforeChange 值。要了解更多信息,请参阅本文档的“包含前图和后图”部分。

maxAwaitTimeMS

设置此操作在服务器上的最长等待执行时间(以毫秒为单位)。

有关可用于配置监视操作的选项的完整列表,请参阅MongoDB Server手册中的监视方法指南。

重要

仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。

默认下,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请在监视函数调用中指定 fullDocumentBeforeChangefullDocument 选项。

前像是文档在更改之前的完整版本。要将前像包含在变更流事件中,请将以下值之一传递给 fullDocumentBeforeChange 选项:

  • whenAvailable:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。

  • required:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。

后像是文档更改后的完整版本。要将后图像包含在变更流事件中,请将以下值之一传递给 fullDocument 选项:

  • updateLookup:更改事件包括更改后某个时间点的整个已更改文档的副本。

  • whenAvailable:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。

  • required:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。

以下示例对集合调用 mongoc_collection_watch() 函数,并通过指定 fullDocument 选项将更新文档的后像包含在结果中:

bson_t *pipeline = bson_new();
bson_t *opts = BCON_NEW("fullDocument", BCON_UTF8("updateLookup"));
const bson_t *doc;
mongoc_change_stream_t *change_stream =
mongoc_collection_watch(collection, pipeline, opts);
while (true) {
bson_error_t error;
if (mongoc_change_stream_next(change_stream, &doc)) {
char *str = bson_as_canonical_extended_json(doc, NULL);
printf("Received change: %s\n", str);
bson_free(str);
} else if (mongoc_change_stream_error_document(change_stream, &error, NULL)) {
printf("Got error on change stream: %s\n", error.message);
break;
}
}
bson_destroy(pipeline);
bson_destroy(opts);
mongoc_change_stream_destroy(change_stream);

在变更流应用程序运行的情况下,使用前面的更新示例更新restaurants集合中的文档会打印类似于以下内容的变更事件:

{
"_id": ...,
"operationType": "update",
"clusterTime": ...,
"wallTime": ...,
"fullDocument": {
"_id": {
...
},
"address": ...,
"borough": "Queens",
"cuisine": "Irish",
"grades": [ ... ],
"name": "Blarney Castle",
"restaurant_id": ...
},
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"documentKey": {
"_id": ...
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
}

要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。

要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。

要学习;了解有关本指南中讨论的任何函数或类型的更多信息,请参阅以下API文档: