对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

使用变更流监控数据

在本指南中,您可以了解如何使用变更流来监控数据库的实时更改。 变更流是 MongoDB Server 的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。

提示

Atlas Stream Processing

作为变更流的替代方案,您可以使用Atlas Stream Processing来处理和转换数据流。与仅注册数据库事件的变更流不同,Atlas Stream Processing托管多种数据事件类型并提供扩展的数据处理功能。要学习;了解有关此功能的更多信息,请参阅MongoDB Atlas文档中的Atlas Stream Processing

本指南中的示例使用Atlas示例数据集中sample_restaurants.restaurants集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅PyMongo入门。

要打开变更流,请调用watch()方法。 您调用watch()方法的实例决定了变更流侦听的事件范围。 您可以对以下类调用watch()方法:

  • MongoClient:监控 MongoDB 部署中的所有更改

  • Database:监控数据库中所有集合的变更

  • Collection:监控集合中的更改

以下示例在 restaurants集合上打开变更流,并在发生变更时输出变更。选择SynchronousAsynchronous标签页以查看相应的代码:

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch() as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch() as stream:
async for change in stream:
print(change)

要开始监视更改,运行应用程序。然后,在单独的应用程序或Shell中,修改 restaurants集合。 以下示例更新 name字段值为 Blarney Castle 的文档。选择SynchronousAsynchronous标签页以查看相应的代码:

database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = await collection.update_one(query_filter, update_operation)

更新集合时,变更流应用程序会在发生变更时打印变更。 打印的变更事件类似于以下内容:

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

您可以将pipeline参数传递给watch()方法,以修改变更流输出。 此参数允许您仅监视指定的变更事件。 将参数格式设置为对象列表,每个对象代表一个聚合阶段。

您可以在pipeline参数中指定以下阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

以下示例使用 pipeline 参数打开仅记录更新操作的变更流。选择SynchronousAsynchronous标签页以查看相应的代码:

change_pipeline = { "$match": { "operationType": "update" }},
with collection.watch(pipeline=change_pipeline) as stream:
for change in stream:
print(change)
change_pipeline = { "$match": { "operationType": "update" }},
async with await collection.watch(pipeline=change_pipeline) as stream:
async for change in stream:
print(change)

要了解有关修改变更流输出的更多信息,请参阅 MongoDB Server 手册中的修改变更流输出部分。

watch()方法接受可选参数,这些参数表示可用于配置操作的选项。 如果不指定任何选项,驱动程序不会自定义操作。

下表描述了可用于自定义watch()行为的选项:

属性
说明

pipeline

修改变更流输出的聚合管道阶段的列表。

full_document

指定是否显示更改后的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像

full_document_before_change

指定是否显示更改前的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像

resume_after

watch()指示 在恢复令牌中指定的操作后恢复返回更改。每个变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id 字段,表示之后要恢复的操作。
resume_after start_after与 和start_at_operation_time 互斥。

start_after

指示watch() 在恢复令牌中指定的操作后启动新的变更流。允许在无效事件后恢复通知。每个变更流事件文档都包含一个恢复令牌作为
_id字段。传递变更事件文档的整个_id 字段,表示之后要恢复的操作。
start_after resume_after与 和start_at_operation_time 互斥。

start_at_operation_time

指示watch() 仅返回指定时间戳之后发生的事件。
start_at_operation_time 与 和resume_after start_after互斥。

max_await_time_ms

服务器在返回空批处理之前等待新数据更改报告给变更流游标的最长时间(以毫秒为单位)。 默认为1000毫秒。

show_expanded_events

从 MongoDB Server v 6.0开始, 变更流支持数据定义语言 (DDL) 事件的变更通知,例如createIndexesdropIndexes事件。 要在变更流中包含扩展事件,请创建变更流游标并将此参数设置为True

batch_size

MongoDB cluster每批响应中返回的变更事件的最大数量。

collation

用于变更流游标的排序规则。

session

ClientSession的实例。

comment

要附加到操作的注释。

重要

仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。

默认情况下,当您对集合执行操作时,相应的变更事件仅包括该操作修改的字段的增量。 要查看更改之前或之后的完整文档,请在watch()方法中指定full_document_before_changefull_document参数。

前像是文档在更改之前的完整版本。 要在变更流事件中包含前像,请将full_document_before_change参数设置为以下值之一:

  • whenAvailable:仅当预像可用时,变更事件才包含变更事件的已修改文档的前像。

  • required:变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则驱动程序会引发错误。

后像是文档更改的完整版本。 要将后图像包含在变更流事件中,请将full_document参数设置为以下值之一:

  • updateLookup:更改事件包括更改后某个时间点的整个已更改文档的副本。

  • whenAvailable:仅当后图像可用时,更改事件才包含更改事件的已修改文档的后图像。

  • required:变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,驱动程序会引发错误。

以下示例对集合调用 watch() 方法,并通过指定 fullDocument 参数来包含更新文档的后像。选择SynchronousAsynchronous标签页以查看相应的代码:

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch(full_document='updateLookup') as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch(full_document='updateLookup') as stream:
async for change in stream:
print(change)

在变更流应用程序运行的情况下,使用前面的更新示例更新restaurants集合中的文档会打印类似于以下内容的变更事件:

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens',
'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'},
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。

要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: