Overview
通过本指南,您可以学习如何使用变更流来监视文档更改。
变更流输出新的变更事件,提供对实时数据变更的访问权限。您可以在集合、数据库或客户端对象上打开变更流。
样本数据
本部分的示例使用以下 Course 结构作为 courses 集合中文档的模型:
type Course struct { Title string Enrollment int32 }
要运行本指南中的示例,请通过使用以下代码段将这些文档加载到 db 数据库中的 courses 集合中:
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
提示
不存在的数据库和集合
如果执行写操作时不存在必要的数据库和集合,服务器会隐式创建这些数据库和集合。
每个文档均包含某一大学课程的说明,其中包括课程标题和最大注册人数,而它们分别对应于每个文档中的 title和enrollment 字段。
注意
每个示例输出都显示截断的 _data、clusterTime 和 ObjectID 值,因为驱动程序会唯一生成这些值。
打开变更流
要打开变更流,请使用 Watch() 方法。Watch() 方法需要一个上下文参数和一个管道参数。要返回所有更改,请输入一个空的 Pipeline 对象。
例子
以下示例在 courses 集合上打开变更流并输出所有更改:
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
如果您在单独的程序或 Shell 中修改 courses 集合,则此代码在发生更改时打印输出更改。插入 title 值为 "Advanced Screenwriting" 且 enrollment 值为 20 的文档会导致以下更改事件:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
修改变更流输出
使用管道参数修改变更流输出。此参数允许您仅监视某些变更事件。将管道参数的格式设置为文档数组,每个文档表示一个聚合阶段。
您可以在此参数中使用以下管道阶段:
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
例子
下例在 db 数据库上打开变更流,但仅监视新的删除操作:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
在 db 数据库上调用了 Watch() 方法,因此代码会输出对此数据库内任何集合的新删除操作。
修改以下方法的行为: Watch()
使用 options 参数修改 Watch() 方法的行为。
您可以为 Watch() 方法指定以下选项:
ResumeAfterStartAfterFullDocumentFullDocumentBeforeChangeBatchSizeMaxAwaitTimeCollationStartAtOperationTimeCommentShowExpandedEventsStartAtOperationTimeCustomCustomPipeline
有关这些选项的更多信息,请访问 MongoDB Server手册。
前像和后像
当您对集合执行任何 CRUD 操作时,默认情况下,相应的变更事件文档仅包含该操作修改的字段的增量。通过在 Watch() 方法的 options 参数中指定设置,您可以查看更改前后的完整文档以及增量。
如果要查看文档的后像,即文档更改后的完整版本,请将 options 参数的 FullDocument 字段设置为以下值之一:
UpdateLookup:变更事件文档包括整个已变更文档的副本。WhenAvailable:变更事件文档包括变更事件的已修改文档的后像(如果后像可用)。Required:输出与WhenAvailable相同,但如果后像不可用,驱动程序会引发服务器端错误。
如果要查看文档的前像,即文档更改前的完整版本,请将 options 参数的 FullDocumentBeforeChange 字段设置为以下值之一:
WhenAvailable:如果前像可用,则变更事件文档包括变更事件的已修改文档的前像。Required:输出与WhenAvailable的输出相同,但如果前像不可用,驱动程序会引发服务器端错误。
重要
要访问文档前像和后像,您必须为集合启用 changeStreamPreAndPostImages。请参阅 MongoDB Server 手册,获取说明和更多信息。
注意
插入的文档没有前像,已删除的文档没有后像。
例子
以下示例对 courses 集合调用 Watch() 方法。它为 options 参数的 FullDocument 字段指定一个值,以输出整个已修改文档的副本,而不是仅输出已更改字段:
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
使用 "World Fiction" 的 title 将文档的 enrollment 值从 35 更新为 30 会导致以下变更事件:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
如果不指定 FullDocument 选项,同一更新操作将不再在变更事件文档中输出 "fullDocument" 值。
更多信息
有关变更流的可运行示例,请参阅 打开变更流示例。
有关 变更流 的更多信息,请参阅Change Streams 。
API 文档
要了解有关 Watch() 方法的更多信息,请访问以下 API 文档链接: