Overview
在本指南中,您可以学习;了解如何使用变更流来监控数据的实时变更。 变更流是MongoDB Server的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。
使用Ruby驱动程序时,可以调用 watch 方法返回 Mongo::Collection::View::ChangeStream对象。然后,您可以遍历其内容以监控数据更改,例如更新、插入和删除。
样本数据
本指南中的示例使用Atlas示例数据集的 sample_restaurants数据库中的 restaurants集合。要从Ruby应用程序访问权限此集合,请创建一个连接到Atlas 集群的Mongo::Client对象,并将以下值分配给 database 和 collection 变量:
database = client.use('sample_restaurants') collection = database[:restaurants]
要学习如何创建免费的MongoDB Atlas 集群并加载示例数据集,请参阅MongoDB 入门指南。
打开变更流
要打开变更流,请调用 watch 方法。您对其调用 watch 方法的对象决定了变更流监控的事件范围。您可以对以下对象调用 watch 方法
Mongo::Database:监控一个数据库中所有集合的变更Mongo::Collection:监控对一个集合的更改
以下示例在restaurants集合上打开变更流,并在发生变更时输出变更:
stream = collection.watch stream.each do |doc| puts doc break if doc['operationType'] == 'invalidate' end
要开始监视更改,请运行前面的代码。 然后,在另一个shell中修改 restaurants集合。 以下示例更新了name字段值为'Blarney Castle'的文档:
collection.update_one( { 'name' => 'Blarney Castle' }, { '$set' => { 'cuisine' => 'Irish' } } )
更新集合时,变更流应用程序会在发生变更时打印变更。 打印的变更事件类似于以下输出:
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=>#<...>, "ns"=>{"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=> {"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=> {"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}}
修改变更流输出
要修改变更流输出,可以将大量中的管道阶段作为参数传递给watch方法。 您可以在大量中包含以下阶段:
$addFields或$set:向文档添加新字段$match:筛选文档$project:投影文档字段的子集$replaceWith或$replaceRoot:将输入文档替换为指定文档$redact:限制文档内容$unset:从文档中删除字段
以下示例将包含$match阶段的管道传递给watch方法。 这指示watch方法仅在发生更新操作时输出事件:
pipeline = [{ '$match' => { 'operationType' => 'update' } }] stream = collection.watch(pipeline) stream.each do |doc| puts doc break if doc['operationType'] == 'invalidate' end
修改手表行为
要修改 watch 方法的行为,您可以将选项哈希作为参数传递给 watch。下表描述了一些可以设立的选项:
选项 | 说明 |
|---|---|
| |
| 指定是否显示更改前的完整文档,而不是仅显示对文档所做的更改。 要了解有关此选项的更多信息,请参阅包括前图像和后图像。 |
| 指定变更流的逻辑点。此选项与 |
| 指示变更流仅提供在指定时间戳时或之后发生的变更。此选项与 |
| 设置用于变更流游标的排序规则。 |
有关 watch 选项的完整列表,请参阅API文档中的监视。
包含前像和后像
重要
仅当您的部署使用 MongoDB v 6.0或更高版本时,才能对集合启用前图像和后图像。
默认下,当您对集合执行操作时,相应的变更事件仅包括操作前后修改的字段及其值。
您可以指示 watch 方法返回文档的前像,即更改前文档的完整版本以及修改后的字段。要在变更流事件中包含前像,请将选项哈希传递给设置 full_document_before_change 选项的 watch。您可以将此选项设立为以下字符串值:
'whenAvailable':变更事件包括变更事件的已修改文档的前像。 如果前像不可用,则此变更事件字段的值为nil。'required':变更事件包括变更事件的已修改文档的前像。 如果前像不可用,服务器将引发错误。'off':(默认)更改事件不包括已修改文档的前像。
您还可以指示 watch 方法返回文档的 后像,即除了修改后的字段之外的文档更改后的完整版本。要将后图像包含在变更流事件中,请将选项哈希传递给设置 full_document 选项的 watch。您可以将此选项设立为以下字符串值:
'updateLookup':更改事件包括更改后某个时间点的整个已更改文档的副本。'whenAvailable':变更事件包括变更事件的已修改文档的后像。 如果后像不可用,则此变更事件字段的值为nil。'required':变更事件包括变更事件的已修改文档的后像。 如果后图像不可用,服务器将引发错误。'default':(默认)更改事件不包括已修改文档的后像。
以下示例对集合调用watch方法,并通过设置full_document选项包含更新文档的后像:
options = { full_document: 'updateLookup' } stream = collection.watch([], options) stream.each do |doc| puts doc break if doc['operationType'] == 'invalidate' end
当变更流应用程序在单独的shell中运行时,使用前面的更新示例更新restaurants集合中的文档会打印类似于以下输出的变更事件:
{"_id"=>{"_data"=>"..."}, "operationType"=>"update", "clusterTime"=> #<...1>, "wallTime"=>..., "fullDocument"=>{"_id"=>BSON::ObjectId('...'), "address"=>{"building"=>"202-24", "coord"=>[-73.9250442, 40.5595462], "street"=>"Rockaway Point Boulevard", "zipcode"=>"11697"}, "borough"=>"Queens", "cuisine"=>"Irish", "grades"=>[...], "name"=>"Blarney Castle", "restaurant_id"=>"40366356"}, "ns"=> {"db"=>"sample_restaurants", "coll"=>"restaurants"}, "documentKey"=> {"_id"=>BSON::ObjectId('...')}, "updateDescription"=>{"updatedFields"=> {"cuisine"=>"Irish"}, "removedFields"=>[], "truncatedArrays"=>[]}}
提示
要了解有关前图像和后图像的更多信息,请参阅Change Streams MongoDB Server手册中的 具有文档前图像和后图像的 。
更多信息
要了解有关变更流的更多信息,请参阅Change Streams MongoDB Server手册中的 。
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: