Docs Menu

Docs HomeGo

Watch for Changes

On this page

  • Overview
  • Sample Data
  • Open a Change Stream
  • Example
  • Modify the Change Stream Output
  • Example
  • Modify the Behavior of Watch()
  • Example
  • Additional Information
  • API Documentation

In this guide, you can learn how to monitor document changes with a change stream.

A change stream outputs new change events, providing access to real-time data changes. You can open a change stream on a collection, database, or client object.

To run the examples in this guide, load these documents into the tea.ratings collection with the following snippet:

coll := client.Database("tea").Collection("ratings")
docs := []interface{}{
bson.D{{"type", "Masala"}, {"rating", 10}},
bson.D{{"type", "Assam"}, {"rating", 5}},
bson.D{{"type", "Oolong"}, {"rating", 7}},
bson.D{{"type", "Earl Grey"}, {"rating", 8}},
bson.D{{"type", "English Breakfast"}, {"rating", 5}},
}
result, err := coll.InsertMany(context.TODO(), docs)
if err != nil {
panic(err)
}
fmt.Printf("Number of documents inserted: %d\n", len(result.InsertedIDs))

Tip

Non-existent Databases and Collections

If the necessary database and collection don't exist when you perform a write operation, the server implicitly creates them.

Each document contains a rating for a type of tea that corresponds to the type and rating fields.

Note

Each example truncates the _data, clusterTime, and ObjectID values because the driver generates them uniquely.

To open a change stream, use the Watch() method. The Watch() method requires a context parameter and a pipeline parameter. To return all changes, pass in an empty Pipeline object.

The following example opens a change stream on the tea.ratings collection and outputs all changes:

coll := client.Database("tea").Collection("ratings")
// open a change stream with an empty pipeline parameter
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// iterate over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

If you modify the tea.ratings collection in a separate shell, this code will print your changes. Inserting a document with a type value of "White Peony" and a rating value of 4 will output the following change event:

map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")]
fullDocument:map[_id:ObjectID("...") rating:4 type:White Peony] ns:
map[coll:ratings db:tea] operationType:insert]

Use the pipeline parameter to modify the change stream output. This parameter allows you to only watch for certain change events. Format the pipeline parameter as an array of documents, with each document representing an aggregation stage.

You can use the following pipeline stages in this parameter:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

The following example opens a change stream on the tea database, but only watches for new delete operations:

db := client.Database("tea")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

If you delete the tea.ratings document with a type value of "White Peony" in a separate shell, this code will output the following:

{"_id": {"_data": "..."},"operationType": "delete","clusterTime":
{"$timestamp":{"t":"...","i":"..."}},"ns": {"db": "tea","coll": "ratings"},
"documentKey": {"_id": {"$oid":"..."}}}

Note

The Watch() method was called on the tea database, so the code outputs new delete operations in any tea collection.

Use an opts parameter to modify the behavior of the Watch() method.

You can specify the following options in the opts parameter:

  • ResumeAfter

  • StartAfter

  • FullDocument

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

For more information on these fields, visit the MongoDB manual.

The following example calls the Watch() method on the tea.ratings collection. It specifies the FullDocument opts parameter to output a copy of the entire modified document:

coll := client.Database("tea").Collection("ratings")
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

If you update the rating value of the "Masala" tea from 10 to 9, this code outputs the following:

{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp":
{"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"type": "Masala","rating":
{"$numberInt":"9"}}, "ns": {"db": "tea","coll": "ratings"},"documentKey": {"_id":
{"$oid":"..."}}, "updateDescription": {"updatedFields": {"rating": {"$numberInt":"9"}},
"removedFields": [],"truncatedArrays": []}}

Without specifying the FullDocument option, the same update operation no longer outputs the "fullDocument" value.

For a runnable example of a change stream, see Watch for Changes.

For more information on change streams, see Change Streams.

To learn more about the Watch() method, visit the following API documentation links:

←  Search TextWrite Operations →
Give Feedback
© 2022 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2022 MongoDB, Inc.