How can I safely serialize change stream documents with C#?

I am using the C# change stream API to watch for all events on a given database. I would like to capture the raw change stream documents (which are ChangeStreamDocument objects) in order for reproduction during unit tests for the translation logic (I take these ChangeStreamDocument objects and convert them into ChangeStreamMessage objects that we use internally). Having a set of the original messages serialized (outside of the oplog) allows me to avoid using the change stream API during these tests, but still validate the translation logic with regression tests.

I’m not sure what the best way is to serialize this data safely - I’ve seen examples for how to do this when reading from a collection (both serializing and deserializing), but not for change events. The examples use the initialized collection object to get a document serializer - see below:

What is the best way to do this for arbitrary change stream documents from the DB? I want to make sure that the internal state of the object isn’t corrupted at all when it is reloaded for testing.

What do you recommend?



Here is an example of some of the code that is watching the database and handing off an IEnumerable<ChangeStreamDocument> for use by the translator - I’d like to be able to replace this with something that can just load from a file, for example. What is a safe way to reliably do this given the MongoDB.Bson libraries?

        public IEnumerable<ChangeStreamDocument<BsonDocument>> InfiniteWatch(ChangeStreamOptions options, int millisecondsToDelayBetweenReads = 50)
        using (var cursor = this.Database.Watch(options))
            while (cursor.MoveNext())
                if (!cursor.Current.Any())
                    Thread.Sleep(millisecondsToDelayBetweenReads); // Avoid hitting the databases too hard while polling
                    continue; // no data

                using (var enumerator = cursor.Current.GetEnumerator())
                    while (enumerator.MoveNext())
                        var document = enumerator.Current;
                        yield return document;

PS - @wan - any pointers on this change stream question? (I saw you had answered a previous question about change streams and thought this might be right up your alley)

It looks like I got back a reliable result from doing the following (so far):

var doc = new ChangeStreamDocument<BsonDocument>(changeStreamEvent.BackingDocument,
                    new BsonDocumentSerializer());

This allows reconstructing the change stream document from a backing document - if I serialize the BackingDocument (BsonDocument), then I should be able to pick it back up and utilize it. I’m planning to use MongoDB.Bson libs to do this serialization to make sure that I don’t introduce incompatibilities through use of Newtonsoft. If anyone has a pointer to the best usage of those libraries, I’d very much appreciate it!

I hunted through the open source code for an example - here is a way that I found to reliably serialize and deserialize the ChangeStreamDocument:

        // given: ChangeStreamDocument<BsonDocument> changeStreamDocument; // received from MongoDB Change Streams
        var subject = new ChangeStreamDocumentSerializer<BsonDocument>(BsonDocumentSerializer.Instance);

        string json;
        using (var textWriter = new StringWriter())
        using (var writer = new MongoDB.Bson.IO.JsonWriter(textWriter))
            var context = BsonSerializationContext.CreateRoot(writer);
            subject.Serialize(context, changeStreamDocument);
            json = textWriter.ToString();

If you need to serialize a change stream object, this is the way to do it while using the MongoDB.Bson libraries without creating artifacts in the data (from what I can see so far). Reading them back simply involves:

    // given: string json; // the original json that was recorded from above
    ChangeStreamDocument<BsonDocument> changeStreamDocument;            
    var subject = new ChangeStreamDocumentSerializer<BsonDocument>(BsonDocumentSerializer.Instance);

        using (var reader = new MongoDB.Bson.IO.JsonReader(json))
            var context = BsonDeserializationContext.CreateRoot(reader);
            changeStreamDocument = subject.Deserialize(context);

I’ve been able to verify that these reliably work over a variety of change stream documents and objects we used.


Hi @Jeremy_Buch,

Glad that you’ve found a solution!


1 Like