数据库触发器可以让您在关联的 MongoDB Atlas 集群上发生数据库更改时执行服务器端逻辑。您可以在单个集合、整个数据库和整个集群上配置触发器。
与在数据库服务器上运行的SQL数据触发器不同, Atlas数据库触发器在独立于数据库服务器扩展的无服务器计算层上运行。 触发器自动调用Atlas 函数,并可通过 Amazon Web Services EventBridge 将事件转发到外部处理程序。
使用数据库触发器,实现事件驱动的数据交互。 例如,可以在相关文档发生更改时自动更新一个文档中的信息, 或者在插入新文档时向外部服务发送请求。
数据库触发器使用MongoDB 变更流来监视集合中的实时更改。变更流是一系列数据库事件,每个事件都描述对集合中文档的操作。您的应用为每个集合打开一个变更流,并至少启用一个触发。 如果为一个集合启用多个触发器,它们股票相同的变更流。
重要
变更流限制
您可以在集群上打开的 变更流 总数有限制,具体取决于集群的大小。 有关详细信息,请参阅变更流限制。
此外,您无法在 Flex 集群或联合数据库实例上定义数据库触发器,因为它们不支持变更流。
您可以控制哪些操作会导致触发器触发,以及触发器触发时会执行哪些操作。例如,您可在文档的特定字段每次更新时运行某一函数。此函数可访问整个变更事件,因此您可始终了解更改的内容。此外,还可将此变更事件传递给 AWS EventBridge 以便在 Atlas 外部处理该事件。
触发器支持 $match 表达式来过滤更改事件,支持 $project 表达式来限制每个事件中包含的数据。
警告
在部署和数据库级别触发器中,允许以导致其他触发器触发的方式配置触发器,从而导致递归。示例包括:在同一数据库内写入到一个集合的数据库级触发器,或者在同一集群中将日志写入到另一个数据库的集群级记录器或日志转发器。
创建数据库触发器
您可以从trigger Atlas用户界面或使用 创建数据库App Services CLI 。
在 Atlas 中,转到 Triggers 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Streaming Data 标题下的 Triggers。
会显示触发器页面。
单击 Add Trigger 打开触发器配置页面。
选择 Database trigger类型。
配置trigger ,然后单击 Save。
字段设置Trigger Type
Database or Scheduled
Watch Against
Collection、 Database或Deployment 。
Cluster Name
标识集群的名称。
Database Name
标识数据库的名称。
Collection Name
标识集合的名称。
Operation Type
导致触发器触发的操作。
对MongoDB Atlas用户进行身份验证:
使用MongoDB Atlas Administration API密钥登录App Services CLI:
appservices login --api-key="<API KEY>" --private-api-key="<PRIVATE KEY>" 拉取应用的最新配置文件:
运行以下命令以获取配置文件的本地副本:
appservices pull --remote=<App ID> 默认,该命令会将文件提取到当前工作目录中。 您可以使用可选的
--local标志指定目录路径。将数据库trigger配置文件添加到本地应用文件的
triggers子目录中。部署更改:
运行以下命令以部署更改:
appservices push
注意
Atlas不会实施trigger配置文件的特定文件名。 但是,导入后, Atlas将重命名每个配置文件,以匹配其定义的trigger名称。
配置
Database Triggers 有以下配置选项:
Trigger 详情
在 Trigger Details 部分中,您可以根据所需的粒度级别选择希望trigger执行 Watch Against 的范围。 您的选项是:
Collection,当指定集合发生更改时
Database,当指定数据库中的任何集合发生更改时
Deployment,当指定集群上出现部署更改时。
如果您选择“部署”源类型,则不会监视以下数据库的更改:
管理员数据库
admin、local和config同步数据库
__realm_sync和__realm_sync_<app_id>
重要
部署级别的源类型仅适用于专用层。
根据您使用的源类型,附加选项会有所不同。下表介绍了这些选项:
提示
原像和性能优化
原像需要额外的存储开销,可能会影响性能。如果没有在集合上使用原像,则应禁用原像。如需了解更多信息,请参阅禁用集合级原像。
运行 MongoDB 4.4+ 的非分片 Atlas 集群以及运行 MongoDB 5.3 及更高版本的分片 Atlas 集群支持文档原像。可以将非分片集群(带有原像)升级到分片集群,只要该集群运行的是 5.3 或更高版本。
触发器配置
字段 | 说明 |
|---|---|
Auto-Resume | 如果启用,当在集群的trigger 中找不到此oplog 的恢复令牌时,trigger 会在下一个相关变更流事件时自动恢复处理事件。从trigger暂停到trigger恢复执行的所有变更流事件都不会trigger 。 |
Event Ordering | 如已启用,将按照触发事件发生的顺序进行处理。如已禁用,事件可以并行处理,当许多事件同时发生时,处理速度会更快。 如果启用了事件排序,此触发器的多次执行操作将根据更改事件的时间戳按顺序进行。如果禁用了事件排序,该触发器的多次执行操作将独立进行。 |
Skip Events On Re-Enable | 默认禁用。如果启用,则不会处理在禁用此触发器时发生的任何变更事件。 |
eventType
在 Event Type(函数)部分,您可以选择触发器触发时要执行的操作。您可以选择运行函数或使用 AWS EventBridge。
高级
在 Advanced 部分中,提供了以下可选的配置选项:
字段 | 说明 | ||||||||
|---|---|---|---|---|---|---|---|---|---|
Project Expression | 一个$project表达式,用于从变更流中的每个事件中选择字段的子集。 您可以使用它来优化trigger的执行。 表达式是一个对象,可以将更改事件中的字段名称映射到
| ||||||||
Match Expression | |||||||||
Maximum Throughput | 如果链接的数据源是专用服务器(M10+ 层),则可以将最大吞吐量增加到超过默认的 10,000 个并发进程。 重要提示:要启用最大吞吐量,必须禁用事件排序。 在增加最大吞吐量之前,请考虑一个或多个触发器是否正在调用速率受限的外部 API。增加触发率可能会导致超限。 提高吞吐量还可能增加更大工作负载,影响集群的整体性能。 |
更改事件类型
数据库变更事件代表关联 MongoDB Atlas 集群的特定集合中的各个变更。
每个数据库事件都具有与底层 变更流 发出的变更事件对象相同的操作类型和结构。变更事件具有以下操作类型:
操作类型 | 说明 |
|---|---|
Insert Document(插入文档)(所有触发器类型) | 表示添加到集合的新文档。 |
Update Document(更新文档)(所有触发器类型) | 表示对集合中现有文档的更改。 |
Delete Document(删除文档)(所有触发器类型) | 表示从集合中删除的文档。 |
Replace Document(替换文档)(所有触发器类型) | 表示用于替换集合中文档的新文档。 |
创建集合(仅限数据库和部署触发器类型) | 表示创建新集合。 |
修改集合(仅限数据库和部署触发器类型) | 表示修改集合。 |
重命名集合(仅限数据库和部署触发器类型) | 表示正在重命名的集合。 |
Drop Collection(删除集合)(仅限数据库和部署触发器类型) | 表示正在删除的集合。 |
分片集合(仅限数据库和部署触发器类型) | 表示从未分片变为分片的集合。 |
重新分片集合(仅限数据库和部署触发器类型) | 表示对集合分片的更改。 |
Refine Collection Shard Key(优化集合分片键)(仅限数据库和部署触发器类型) | 表示集合分片键的变化。 |
Create Indexes(创建索引)(仅限数据库和部署触发器类型) | 表示创建新索引。 |
Drop Indexes(删除索引)(仅限数据库和部署触发器类型) | 表示正在删除的索引。 |
删除数据库(仅限部署触发器类型) | 表示正在删除的数据库。 |
数据库变更事件对象采用以下常规形式:
{ _id : <ObjectId>, "operationType": <string>, "fullDocument": <document>, "fullDocumentBeforeChange": <document>, "ns": { "db" : <string>, "coll" : <string> }, "documentKey": { "_id": <ObjectId> }, "updateDescription": <document>, "clusterTime": <Timestamp> }
数据库触发器示例
一家在线商店希望在客户订单发生变化时通知其客户。他们将 store.orders 集合中的每个订单记录为类似于以下内容的文档:
{ _id: ObjectId("59cf1860a95168b8f685e378"), customerId: ObjectId("59cf17e1a95168b8f685e377"), orderDate: ISODate("2018-06-26T16:20:42.313Z"), shipDate: ISODate("2018-06-27T08:20:23.311Z"), orderContents: [ { qty: 1, name: "Earl Grey Tea Bags - 100ct", price: NumberDecimal("10.99") } ], shippingLocation: [ { location: "Memphis", time: ISODate("2018-06-27T18:22:33.243Z") }, ] }
为了自动执行此进程,存储创建了一个数据库trigger ,用于侦听 store.orders集合中的 Update 更改事件。 当trigger观察到 Update事件时,它会将变更事件对象传递给其关联函数 textShippingUpdate。 该函数会检查该变更事件以了解对shippingLocation字段的任何更改,如果字段已更新,则向客户发送一条包含订单新位置的文本消息。
字段 | 设置 |
|---|---|
Trigger Type | Database |
Watch Against | Collection |
Cluster Name |
|
Database Name |
|
Collection Name |
|
Operation Type |
|
{ "type": "DATABASE", "name": "shippingLocationUpdater", "function_name": "textShippingUpdate", "config": { "service_name": "mongodb-atlas", "database": "store", "collection": "orders", "operation_types": ["UPDATE"], "unordered": false, "full_document": true, "match": {} }, "disabled": false }
exports = async function (changeEvent) { // Destructure out fields from the change stream event object const { updateDescription, fullDocument } = changeEvent; // Check if the shippingLocation field was updated const updatedFields = Object.keys(updateDescription.updatedFields); const isNewLocation = updatedFields.some(field => field.match(/shippingLocation/) ); // If the location changed, text the customer the updated location. if (isNewLocation) { const { customerId, shippingLocation } = fullDocument; const mongodb = context.services.get("mongodb-atlas"); const customers = mongodb.db("store").collection("customers"); const { location } = shippingLocation.pop(); const customer = await customers.findOne({ _id: customerId }); const twilio = require('twilio')( // Your Account SID and Auth Token from the Twilio console: context.values.get("TwilioAccountSID"), context.values.get("TwilioAuthToken"), ); await twilio.messages.create({ To: customer.phoneNumber, From: context.values.get("ourPhoneNumber"), Body: `Your order has moved! The new location is ${location}.` }) } };
暂停触发器
数据库触发器可能会进入暂停状态,以响应阻止触发器的变更流继续的事件。可以暂停触发器的事件包括:
无效事件,例如
dropDatabase、renameCollection或网络中断导致的事件。恢复变更流所需的恢复令牌已不在集群 oplog 中。应用日志将此称为
ChangeStreamHistoryLost错误。$match阶段从oplog中过滤掉包含恢复令牌的 oplog条目。下一个匹配事件将没有恢复令牌,因此触发器无法恢复。
事件trigger暂停或失败, Atlas会向项目所有者发送电子邮件,提醒他们注意该问题。
自动恢复暂停的触发器
您可以将触发器配置为在触发器因恢复令牌不再位于 oplog 中而被暂停时自动恢复。在恢复令牌丢失和恢复进程完成之间,触发器不会处理任何错过的变更流事件。
在 用户界面中 创建或更新数据库triggerAtlas 时:
导航到要在暂停时自动恢复的trigger的配置页面。
在 Advanced (Optional) 部分,选择 Auto-Resume Triggers。
保存并部署更改。
手动恢复暂停的触发器
当您手动恢复暂停的触发器时,您的应用程序会在该变更流停止后尝试在下一个变更流事件时恢复触发器。如果恢复令牌不再位于集群 oplog 中,则必须在没有恢复令牌的情况下启动触发器。这意味着触发器开始侦听新事件,但不会处理任何错过的过去事件。
您可以通过 扩展|service| 来调整oplog大小,以便在暂停后将恢复令牌保留更长时间。 集群 。保持oplog 大小比集群的峰值oplog 吞吐量(GB /小时)大几倍,以降低暂停trigger 的恢复令牌在oplog trigger执行之前从 中丢失的风险。在 |service| 中的oplogGB /Hour 图表中查看集群的 吞吐量 集群指标。oplog
您可以从trigger Atlas用户界面或使用 重新启动暂停的App Services CLI 。
AtlasGoTriggers在Atlas中,Go项目的 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Streaming Data 标题下的 Triggers。
会显示触发器页面。
对 MongoDB Atlas 用户进行身份验证
使用MongoDB Atlas Administration API密钥登录App Services CLI:
appservices login --api-key="<API KEY>" --private-api-key="<PRIVATE KEY>"
验证trigger配置文件是否存在
如果导出了应用程序的新副本,它应该已经包含已暂停trigger的最新配置文件。 您可以通过在/triggers 目录中查找与 同名的trigger 配置文件trigger 来确认配置文件是否存在。
触发时间报告
Atlas用户界面中的触发器列表显示三个时间戳:
最近修改
这是触发器的创建时间或最近更改时间。
最近心跳
Atlas会追踪上次运行trigger的时间。 如果trigger未发送任何事件,服务器会发送心跳以确保trigger的恢复令牌保持最新。 最新的事件显示为Latest Heartbeat 。
最后一次集群处理时间
Atlas还追踪Last Cluster Time Processed,这是支持trigger的变更流最后一次发出事件的时间。 如果自最近一次心跳以来没有发生任何事件,则它将比Latest Heartbeat更早。
性能优化
禁用突发操作的事件排序
如果您的触发器会在接收短暂事件(例如,插入数据是日常批处理作业的一部分)的集合上触发,则可以考虑禁用事件排序。
有序触发器等待执行特定事件的函数,直到先前事件的函数完成执行。因此,有序触发器实际上受到每个顺序触发器函数的运行时间的速率限制。这可能会导致出现在变更流上的数据库事件与触发器触发之间的明显延迟。在某些极端情况下,数据库事件可能会在长时间运行的有序触发器处理它们之前从 oplog 中脱离。
如果可能的话,无序触发器会并行执行函数,这可能会显著提高速度(取决于您的用例),但不保证多次执行触发器函数时遵循事件顺序。
禁用集合级原像
文档原像要求集群记录有关集合上每个操作的额外数据。一旦为集合上的任何触发器启用原像,集群就会存储集合上每个操作的原像。
额外的存储空间和计算开销可能会降低触发器性能,具体取决于集群配置。
为了避免原像的存储和计算开销,必须禁用整个底层 MongoDB 集合的原像。这是与任何单个触发器的原像设置不同的设置。
如果禁用了集合级别原像,则该集合上的任何活动触发器都不能使用原像。但是,如果删除或禁用了某个集合上的所有原像触发器,那么也可以禁用集合级别原像。
要学习;了解如何操作,请参阅禁用集合的原像。
使用匹配表达式来限制触发器调用
您可以通过在trigger 字段中指定 $match 表达式来限制Match Expression 调用次数。Atlas根据变更事件文档计算匹配表达式,仅当给定变更事件的表达式计算结果为 true 时才调用trigger 。
匹配表达式是一个 JSON 文档,它使用 MongoDB 读取查询语法来指定查询条件。
我们建议仅当 Trigger 事件的数量明显成为性能问题时才使用匹配表达式。在此之前,接收所有事件并在 Trigger 函数代码中单独对其处理。
变更事件文档的具体结构取决于导致触发器触发的事件。有关详细信息,请参阅各事件类型的参考文档:
仅当更改事件对象指定文档中的 status 字段发生更改时,以下匹配表达式才允许触发触发器。
updateDescription 是更新事件对象的一个字段。
{ "updateDescription.updatedFields.status": { "$exists": true } }
以下匹配表达式允许触发器仅在文档的 needsTriggerResponse 字段为 true 时触发。插入、更新和替换事件的 fullDocument 字段表示指定操作执行后的文档。要接收 fullDocument 字段,您必须在触发器配置中启用 Full Document。
{ "fullDocument.needsTriggerResponse": true }
测试匹配表达式
下面的步骤展示了一种测试匹配表达式是否按预期运行的方法:
将
DB_NAME替换为您的数据库名称,将COLLECTION_NAME替换为您的集合名称,将YOUR_MATCH_EXPRESSION替换为您要测试的匹配表达式,然后将以下内容粘贴到 mongosh 中以在现有集合上打开变更流:db.getSiblingDB(DB_NAME).COLLECTION_NAME.watch([{$match: YOUR_MATCH_EXPRESSION}]) while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { print(tojson(watchCursor.next())); } } 在另一个终端窗口中,使用
mongosh对集合中的某些测试文档进行更改。观察变更流筛选的内容。
使用投影表达式减少输入数据大小
在 Project Expression 字段中,使用 $project 表达式限制触发器处理的字段数量。
注意
项目仅包含在内
使用触发器时,投影表达式仅为包含性的。投影不支持将包含和排除相混合。投影表达式必须是包含性的,因为触发器要求包含 operationType。
如果要排除单个字段,则投影表达式必须包括要排除的字段以外的所有字段。您只能明确排除 _id,因为默认情况下包含 _id。
触发器配置有以下 Project Expression:
{ "_id": 0, "operationType": 1, "updateDescription.updatedFields.status": 1 }
Atlas传递给trigger Function 的变更事件对象仅包含投影中指定的字段,如以下示例:
{ "operationType": "update", "updateDescription": { "updatedFields": { "status": "InProgress" } } }
