Overview
在本指南中,您可以了解如何使用变更流来监控数据库的实时更改。 变更流是 MongoDB Server 的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。
变更流输出新的变更事件,提供对实时数据变更的访问权限。您可以在集合、数据库或客户端对象上打开变更流。
样本数据
本部分的示例使用以下 Course
结构作为 courses
集合中文档的模型:
type Course struct { Title string Enrollment int32 }
要运行本指南中的示例,请通过使用以下代码段将这些文档加载到 db
数据库中的 courses
集合中:
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
提示
不存在的数据库和集合
如果执行写操作时不存在必要的数据库和集合,服务器会隐式创建这些数据库和集合。
每个文档均包含某一大学课程的说明,其中包括课程标题和最大注册人数,而它们分别对应于每个文档中的 title
和enrollment
字段。
注意
每个示例输出都显示截断的 _data
、clusterTime
和 ObjectID
值,因为驱动程序会唯一生成这些值。
打开变更流
您可以对以下对象使用 Watch()
方法,监视MongoDB中的更改:
集合:监控对特定集合的更改
数据库:监控数据库中所有集合的更改
MongoClient:监控所有数据库的变更
对于每个对象,Watch()
方法都会打开一个更改流,以便在更改事件发生时发出更改事件文档。
Watch()
方法需要一个上下文参数和一个管道参数。要返回所有更改,请传入一个空的 Pipeline
对象。
Watch()
方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数。聚合阶段筛选和转换变更事件。
例子
以下示例在 courses
集合上打开一个变更流,并在发生变更流事件时打印这些事件:
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
如果您在单独的程序或 Shell 中修改 courses
集合,则此代码在发生更改时打印输出更改。插入 title
值为 "Advanced Screenwriting"
且 enrollment
值为 20
的文档会导致以下更改事件:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
要查看完全可运行的示例,请参阅本指南中的打开变更流示例:完整文件部分。
筛选更改事件
使用管道参数修改变更流输出。此参数允许您仅监视某些变更事件。将管道参数的格式设置为文档数组,每个文档表示一个聚合阶段。
您可以在此参数中使用以下管道阶段:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
下例在 db
数据库上打开变更流,但仅监视新的删除操作:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
在 db
数据库上调用了 Watch()
方法,因此代码会输出对此数据库内任何集合的新删除操作。
打开变更流示例:完整文件
注意
设置示例
此示例使用连接 URI 连接到MongoDB实例。要学习;了解有关连接到MongoDB实例的更多信息,请参阅创建 MongoClient指南。此示例还使用Atlas示例数据集包含的 sample_restaurants
数据库中的 restaurants
集合。您可以按照Atlas入门指南,将它们加载到MongoDB Atlas免费套餐上的数据库中。
以下示例在restaurants
collection 上打开 change stream 并打印插入的文档:
coll := client.Database("sample_restaurants").Collection("restaurants") // Creates instructions to watch for insert operations pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}} // Creates a change stream that receives change events cs, err := coll.Watch(context.TODO(), pipeline) if err != nil { panic(err) } defer cs.Close(context.TODO()) fmt.Println("Waiting For Change Events. Insert something in MongoDB!") // Prints a message each time the change stream receives an event for cs.Next(context.TODO()) { var event bson.M if err := cs.Decode(&event); err != nil { panic(err) } output, err := json.MarshalIndent(event["fullDocument"], "", " ") if err != nil { panic(err) } fmt.Printf("%s\n", output) } if err := cs.Err(); err != nil { panic(err) }
查看 完全可运行的示例。
预期结果
运行完整示例后,在不同的Shell中运行插入文档完整文件示例。运行插入操作时,您会看到类似以下内容的输出:
// results truncated { "_id": ..., "name": "8282", "cuisine": "Korean" }
重要
使用完此用法示例后,请确保通过关闭终端将其关闭。
配置变更流选项
使用 options
参数修改 Watch()
方法的行为。
您可以为 Watch()
方法指定以下选项:
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
Custom
CustomPipeline
有关这些选项的更多信息,请参阅 db. 集合.watch()服务器手册中的条目。
前像和后像
当您对集合执行任何 CRUD 操作时,默认情况下,相应的变更事件文档仅包含该操作修改的字段的增量。通过在 Watch()
方法的 options
参数中指定设置,您可以查看更改前后的完整文档以及增量。
如果要查看文档的后像,即文档更改后的完整版本,请将 options
参数的 FullDocument
字段设立为以下值之一:
UpdateLookup
:变更事件文档包括整个已变更文档的副本。WhenAvailable
:变更事件文档包括变更事件的已修改文档的后像(如果后像可用)。Required
:输出与WhenAvailable
相同,但如果后像不可用,驱动程序会引发服务器端错误。
如果要查看文档的前像,即更改之前文档的完整版本,请将 options
参数的 FullDocumentBeforeChange
字段设立为以下值之一:
WhenAvailable
:如果前像可用,则变更事件文档包括变更事件的已修改文档的前像。Required
:输出与WhenAvailable
的输出相同,但如果前像不可用,驱动程序会引发服务器端错误。
重要
要访问权限文档前像和后像,您必须为集合启用changeStreamPreAndPostImages
。有关说明和更多信息,请参阅MongoDB Server手册中 collMod 数据库命令指南的变更流部分。
注意
插入的文档没有前像,已删除的文档没有后像。
例子
以下示例对 courses
集合调用 Watch()
方法。它为 options
参数的 FullDocument
字段指定一个值,以输出整个已修改文档的副本,而不是仅输出已更改字段:
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
使用 "World
Fiction"
的 title
将文档的 enrollment
值从 35
更新为 30
会导致以下变更事件:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
如果不指定 FullDocument
选项,同一更新操作将不再在变更事件文档中输出 "fullDocument"
值。
更多信息
有关变更流的更多信息,请参阅服务器手册中的变更流。
API 文档
要了解有关Watch()
方法的详情,请参阅以下 API 文档: