Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs 菜单
Docs 主页
/ / /
Go 驱动程序
/

使用变更流监控数据

在本指南中,您可以了解如何使用变更流来监控数据库的实时更改。 变更流是 MongoDB Server 的一项功能,允许应用程序订阅集合、数据库或部署上的数据更改。

变更流输出新的变更事件,提供对实时数据变更的访问权限。您可以在集合、数据库或客户端对象上打开变更流。

本部分的示例使用以下 Course 结构作为 courses 集合中文档的模型:

type Course struct {
Title string
Enrollment int32
}

要运行本指南中的示例,请通过使用以下代码段将这些文档加载到 db 数据库中的 courses 集合中:

coll := client.Database("db").Collection("courses")
docs := []interface{}{
Course{Title: "World Fiction", Enrollment: 35},
Course{Title: "Abstract Algebra", Enrollment: 60},
}
result, err := coll.InsertMany(context.TODO(), docs)

提示

不存在的数据库和集合

如果执行写操作时不存在必要的数据库和集合,服务器会隐式创建这些数据库和集合。

每个文档均包含某一大学课程的说明,其中包括课程标题和最大注册人数,而它们分别对应于每个文档中的 titleenrollment 字段。

注意

每个示例输出都显示截断的 _dataclusterTimeObjectID 值,因为驱动程序会唯一生成这些值。

您可以对以下对象使用 Watch() 方法,监视MongoDB中的更改:

  • 集合:监控对特定集合的更改

  • 数据库:监控数据库中所有集合的更改

  • MongoClient:监控所有数据库的变更

对于每个对象,Watch() 方法都会打开一个更改流,以便在更改事件发生时发出更改事件文档。

Watch() 方法需要一个上下文参数和一个管道参数。要返回所有更改,请传入一个空的 Pipeline对象。

Watch() 方法可以选择采用由聚合阶段数组组成的聚合管道作为第一个参数。聚合阶段筛选和转换变更事件。

以下示例在 courses集合上打开一个变更流,并在发生变更流事件时打印这些事件:

changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

如果您在单独的程序或 Shell 中修改 courses 集合,则此代码在发生更改时打印输出更改。插入 title 值为 "Advanced Screenwriting"enrollment 值为 20 的文档会导致以下更改事件:

map[_id:map[_data:...] clusterTime: {...}
documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...")
enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db]
operationType:insert]

要查看完全可运行的示例,请参阅本指南中的打开变更流示例:完整文件部分。

使用管道参数修改变更流输出。此参数允许您仅监视某些变更事件。将管道参数的格式设置为文档数组,每个文档表示一个聚合阶段。

您可以在此参数中使用以下管道阶段:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

下例在 db 数据库上打开变更流,但仅监视新的删除操作:

db := client.Database("db")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the delete operation change events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

注意

db 数据库上调用了 Watch() 方法,因此代码会输出对此数据库内任何集合的新删除操作。

注意

设置示例

此示例使用连接 URI 连接到MongoDB实例。要学习;了解有关连接到MongoDB实例的更多信息,请参阅创建 MongoClient指南。此示例还使用Atlas示例数据集包含的 sample_restaurants数据库中的 restaurants集合。您可以按照Atlas入门指南,将它们加载到MongoDB Atlas免费套餐上的数据库中。

以下示例在restaurants collection 上打开 change stream 并打印插入的文档:

coll := client.Database("sample_restaurants").Collection("restaurants")
// Creates instructions to watch for insert operations
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
// Creates a change stream that receives change events
cs, err := coll.Watch(context.TODO(), pipeline)
if err != nil {
panic(err)
}
defer cs.Close(context.TODO())
fmt.Println("Waiting For Change Events. Insert something in MongoDB!")
// Prints a message each time the change stream receives an event
for cs.Next(context.TODO()) {
var event bson.M
if err := cs.Decode(&event); err != nil {
panic(err)
}
output, err := json.MarshalIndent(event["fullDocument"], "", " ")
if err != nil {
panic(err)
}
fmt.Printf("%s\n", output)
}
if err := cs.Err(); err != nil {
panic(err)
}

查看 完全可运行的示例。

运行完整示例后,在不同的Shell中运行插入文档完整文件示例。运行插入操作时,您会看到类似以下内容的输出:

// results truncated {
"_id": ..., "name": "8282", "cuisine": "Korean"
}

重要

使用完此用法示例后,请确保通过关闭终端将其关闭。

使用 options 参数修改 Watch() 方法的行为。

您可以为 Watch() 方法指定以下选项:

  • ResumeAfter

  • StartAfter

  • FullDocument

  • FullDocumentBeforeChange

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

  • Comment

  • ShowExpandedEvents

  • Custom

  • CustomPipeline

有关这些选项的更多信息,请参阅 db. 集合.watch()服务器手册中的条目。

当您对集合执行任何 CRUD 操作时,默认情况下,相应的变更事件文档仅包含该操作修改的字段的增量。通过在 Watch() 方法的 options 参数中指定设置,您可以查看更改前后的完整文档以及增量。

如果要查看文档的后像,即文档更改后的完整版本,请将 options 参数的 FullDocument字段设立为以下值之一:

  • UpdateLookup:变更事件文档包括整个已变更文档的副本。

  • WhenAvailable:变更事件文档包括变更事件的已修改文档的后像(如果后像可用)。

  • Required:输出与 WhenAvailable 相同,但如果后像不可用,驱动程序会引发服务器端错误。

如果要查看文档的前像,即更改之前文档的完整版本,请将 options 参数的 FullDocumentBeforeChange字段设立为以下值之一:

  • WhenAvailable:如果前像可用,则变更事件文档包括变更事件的已修改文档的前像。

  • Required:输出与 WhenAvailable 的输出相同,但如果前像不可用,驱动程序会引发服务器端错误。

重要

要访问权限文档前像和后像,您必须为集合启用changeStreamPreAndPostImages。有关说明和更多信息,请参阅MongoDB Server手册中 collMod 数据库命令指南的变更流部分。

注意

插入的文档没有前像,已删除的文档没有后像。

以下示例对 courses 集合调用 Watch() 方法。它为 options 参数的 FullDocument 字段指定一个值,以输出整个已修改文档的副本,而不是仅输出已更改字段:

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

使用 "World Fiction"title 将文档的 enrollment 值从 35 更新为 30 会导致以下变更事件:

{"_id": {"_data": "..."},"operationType": "update","clusterTime":
{"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}},
"ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}},
"updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

如果不指定 FullDocument 选项,同一更新操作将不再在变更事件文档中输出 "fullDocument" 值。

有关变更流的更多信息,请参阅服务器手册中的变更流

要了解有关Watch()方法的详情,请参阅以下 API 文档:

后退

日志记录

在此页面上