Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
Tip
Atlas Stream Processing
As an alternative to change streams, you can use Atlas Stream Processing to process and transform streams of data. Unlike change streams, which register only database events, Atlas Stream Processing manages multiple data event types and provides extended data processing capabilities. To learn more about this feature, see Atlas Stream Processing in the MongoDB Atlas documentation.
Sample Data
The examples in this guide use the sample_restaurants.restaurants collection from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with the .NET/C# Driver.
The examples on this page use the following Restaurant, Address, and GradeEntry classes as models:
public class Restaurant { public ObjectId Id { get; set; } public string Name { get; set; } [] public string RestaurantId { get; set; } public string Cuisine { get; set; } public Address Address { get; set; } public string Borough { get; set; } public List<GradeEntry> Grades { get; set; } }
public class Address { public string Building { get; set; } [] public double[] Coordinates { get; set; } public string Street { get; set; } [] public string ZipCode { get; set; } }
public class GradeEntry { public DateTime Date { get; set; } public string Grade { get; set; } public float? Score { get; set; } }
Note
The documents in the restaurants collection use the snake-case naming convention. The examples in this guide use a ConventionPack to deserialize the fields in the collection into Pascal case and map them to the properties in the Restaurant class.
To learn more about custom serialization, see Custom Serialization.
Open a Change Stream
To open a change stream, call the Watch() or WatchAsync() method. The instance on which you call the method determines the scope of events that the change stream listens for. You can call the Watch() or WatchAsync() method on the following classes:
MongoClient: To monitor all changes in the MongoDB deploymentDatabase: To monitor changes in all collections in the databaseCollection: To monitor changes in the collection
The following example opens a change stream on the restaurants collection and outputs the changes as they occur. Select the Synchronous or Asynchronous tab to see the corresponding code.
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change stream and prints the changes as they're received using (var cursor = collection.Watch()) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following type of change: " + change.BackingDocument); } }
var database = client.GetDatabase("sample_restaurants"); var collection = database.GetCollection<Restaurant>("restaurants"); // Opens a change streams and print the changes as they're received using var cursor = await collection.WatchAsync(); await cursor.ForEachAsync(change => { Console.WriteLine("Received the following type of change: " + change.BackingDocument); });
To begin watching for changes, run the application. Then, in a separate application or shell, modify the restaurants collection. Updating a document that has a "name" value of "Blarney Castle" results in the following change stream output:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...), "wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [], "truncatedArrays" : [] } }
Modify the Change Stream Output
You can pass the pipeline parameter to the Watch() and WatchAsync() methods to modify the change stream output. This parameter allows you to watch for only specified change events. Create the pipeline by using the EmptyPipelineDefinition class and appending the relevant aggregation stage methods.
You can specify the following aggregation stages in the pipeline parameter:
$addFields$changeStreamSplitLargeEvent$match$project$replaceRoot$replaceWith$redact$set$unset
Tip
To learn how to build an aggregation pipeline by using the PipelineDefinitionBuilder class, see Aggregation Pipeline Stages in the Operations with Builders guide.
To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.
Monitor Update Events Example
The following example uses the pipeline parameter to open a change stream that records only update operations. Select the Synchronous or Asynchronous tab to see the corresponding code.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received using (var cursor = collection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine("Received the following change: " + change); } }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received using (var cursor = await collection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { Console.WriteLine("Received the following change: " + change); }); }
Split Large Change Events Example
If your application generates change events that exceed 16 MB in size, the server returns a BSONObjectTooLarge error. To avoid this error, you can use the $changeStreamSplitLargeEvent pipeline stage to split the events into smaller fragments. The .NET/C# Driver aggregation API includes the ChangeStreamSplitLargeEvent() method, which you can use to add the $changeStreamSplitLargeEvent stage to the change stream pipeline.
This example instructs the driver to watch for changes and split change events that exceed the 16 MB limit. The code prints the change document for each event and calls helper methods to reassemble any event fragments:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .ChangeStreamSplitLargeEvent(); using var cursor = collection.Watch(pipeline); foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) { Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .ChangeStreamSplitLargeEvent(); using var cursor = await collection.WatchAsync(pipeline); await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) { Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); }
Note
We recommend reassembling change event fragments, as shown in the preceding example, but this step is optional. You can use the same logic to watch both split and complete change events.
The preceding example uses the GetNextChangeStreamEvent(), GetNextChangeStreamEventAsync(), and MergeFragment() methods to reassemble change event fragments into a single change stream document. The following code defines these methods:
// Fetches the next complete change stream event private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>( IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator) { while (changeStreamEnumerator.MoveNext()) { var changeStreamEvent = changeStreamEnumerator.Current; if (changeStreamEvent.SplitEvent != null) { var fragment = changeStreamEvent; while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { changeStreamEnumerator.MoveNext(); fragment = changeStreamEnumerator.Current; MergeFragment(changeStreamEvent, fragment); } } yield return changeStreamEvent; } } // Merges a fragment into the base event private static void MergeFragment<TDocument>( ChangeStreamDocument<TDocument> changeStreamEvent, ChangeStreamDocument<TDocument> fragment) { foreach (var element in fragment.BackingDocument) { if (element.Name != "_id" && element.Name != "splitEvent") { changeStreamEvent.BackingDocument[element.Name] = element.Value; } } }
// Fetches the next complete change stream event private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>( IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) { var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator(); while (await changeStreamEnumerator.MoveNextAsync()) { var changeStreamEvent = changeStreamEnumerator.Current; if (changeStreamEvent.SplitEvent != null) { var fragment = changeStreamEvent; while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) { await changeStreamEnumerator.MoveNextAsync(); fragment = changeStreamEnumerator.Current; MergeFragment(changeStreamEvent, fragment); } } yield return changeStreamEvent; } } private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>( IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) { while (await changeStreamCursor.MoveNextAsync()) { foreach (var changeStreamEvent in changeStreamCursor.Current) { yield return changeStreamEvent; } } } // Merges a fragment into the base event private static void MergeFragment<TDocument>( ChangeStreamDocument<TDocument> changeStreamEvent, ChangeStreamDocument<TDocument> fragment) { foreach (var element in fragment.BackingDocument) { if (element.Name != "_id" && element.Name != "splitEvent") { changeStreamEvent.BackingDocument[element.Name] = element.Value; } } }
Tip
To learn more about splitting large change events, see $changeStreamSplitLargeEvent in the MongoDB Server manual.
Modify Watch() Behavior
The Watch() and WatchAsync() methods accept optional parameters, which represent options you can use to configure the operation. If you don't specify any options, the driver does not customize the operation.
The following table describes the options you can set to customize the behavior of Watch() and WatchAsync():
Option | Description |
|---|---|
| Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Directs |
| Directs |
| Directs |
| Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds. |
| Starting in MongoDB Server v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the |
| Specifies the maximum number of documents that a change
stream can return in each batch, which applies to |
| Specifies the collation to use for the change stream cursor. See the Collation section of this page for more information. |
| Attaches a comment to the operation. |
Collation
To configure collation for your operation, create an instance of the Collation class.
The following table describes the parameters that the Collation constructor accepts. It also lists the corresponding class property that you can use to read each setting's value.
Parameter | Description | Class Property |
|---|---|---|
| Specifies the International Components for Unicode (ICU) locale. For a list of
supported locales,
see Collation Locales and Default Parameters
in the MongoDB Server Manual. |
|
| (Optional) Specifies whether to include case comparison. |
|
| (Optional) Specifies the sort order of case differences during tertiary level comparisons. |
|
| (Optional) Specifies the level of comparison to perform, as defined in the
ICU documentation. |
|
| (Optional) Specifies whether the driver compares numeric strings as numbers. |
|
| (Optional) Specifies whether the driver considers whitespace and punctuation as base
characters for purposes of comparison. |
|
| (Optional) Specifies which characters the driver considers ignorable when
the |
|
| (Optional) Specifies whether the driver normalizes text as needed. |
|
| (Optional) Specifies whether strings containing diacritics sort from the back of the string
to the front. |
|
For more information about collation, see the Collation page in the MongoDB Server manual.
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, create a ChangeStreamOptions object and specify the FullDocumentBeforeChange or the FullDocument options. Then, pass the ChangeStreamOptions object to the Watch() or WatchAsync() method.
The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, set the FullDocumentBeforeChange option to one of the following values:
ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable: The change event includes a pre-image of the modified document for change events only if the pre-image is available.ChangeStreamFullDocumentBeforeChangeOption.Required: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.
The post-image is the full version of a document after a change. To include the post-image in the change stream event, set the FullDocument option to one of the following values:
ChangeStreamFullDocumentOption.UpdateLookup: The change event includes a copy of the entire changed document from some time after the change.ChangeStreamFullDocumentOption.WhenAvailable: The change event includes a post-image of the modified document for change events only if the post-image is available.ChangeStreamFullDocumentOption.Required: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.
The following example opens a change stream on a collection and includes the post-image of updated documents by specifying the FullDocument option. Select the Synchronous or Asynchronous tab to see the corresponding code.
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using (var cursor = collection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { Console.WriteLine(change.FullDocument.ToBsonDocument()); } }
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; using var cursor = await collection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument()); });
Running the preceding code example and updating a document that has a "name" value of "Blarney Castle" results in the following change stream output:
{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356", "cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: