How to recover change stream from document that wont deserialize

We have an Audit log implementation (C#) that uses a change stream with various filters to watch change & replace operations. The audit log uses pre images to generate a diff of an update & persist.

While testing the resistance of the implementation I encounter a scenario that fails at the driver level.

When updating a document to close to the 16mb Bson threshold, the update is accepted but the resulting change stream object surpasses the limit with the various addition fields included. This is resulting in the driver throwing an exception when attempting to process the message:

ongoDB.Driver.MongoCommandException: Command aggregate failed: PlanExecutor error during aggregation :: caused by :: BSONObj size: 29749890 (0x1C5F282) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "82631AFEEF000000032B022C0100296E5A1004602205B9156343C2AADFC6E485D4A71446645F69640064631A0BEE6B2B2CADFE192D310004" }.
         at MongoDB.Driver.Core.WireProtocol.CommandUsingCommandMessageWireProtocol`1.ProcessResponse(ConnectionId connectionId, CommandMessage responseMessage)
         at MongoDB.Driver.Core.WireProtocol.CommandUsingCommandMessageWireProtocol`1.ExecuteAsync(IConnection connection, CancellationToken cancellationToken)
         at MongoDB.Driver.Core.Servers.Server.ServerChannel.ExecuteProtocolAsync[TResult](IWireProtocol`1 protocol, ICoreSession session, CancellationToken cancellationToken)
         at MongoDB.Driver.Core.Operations.RetryableReadOperationExecutor.ExecuteAsync[TResult](IRetryableReadOperation`1 operation, RetryableReadContext context, CancellationToken cancellationToken)
         at MongoDB.Driver.Core.Operations.ReadCommandOperation`1.ExecuteAsync(RetryableReadContext context, CancellationToken cancellationToken)
         at MongoDB.Driver.Core.Operations.AggregateOperation`1.ExecuteAsync(RetryableReadContext context, CancellationToken cancellationToken)
         at MongoDB.Driver.Core.Operations.ChangeStreamOperation`1.ExecuteAsync(IReadBinding binding, CancellationToken cancellationToken)

As the change stream implementation is watching changes, we ideally do not want to miss any updates. We, therefore, use ResumeAfter with the ResumeToken._data field of the last successfully processed event. As the event cannot be deserialised by the driver, we can never get the bad messages resumption token to skip over it causing continuous failure.

Are there any ways to get a change stream event resume token without loading the message? (Perhaps we could use an aggregate to project just the token?)

Hi, @Anthony_Halliday,

I understand that you’re having a problem with returned ChangeStreamDocument<T> objects exceeding 16MB. This is a limitation of change streams because the returned document is a BSON document itself and must fit within the 16MB BSON limit. This limit can be encountered more frequently when requesting pre-/post-images of the affected document in the change stream.

Since the change stream document cannot be parsed, you cannot access its _id, which is the resume token. You can however call cursor.GetResumeToken() even after the change stream throws.

Note that the resume token returned is probably the change stream event that exceeded 16 MB. Thus you would need some additional logic to skip over the offending change stream document, restart the change stream without pre-/post-images, or include a pipeline that performs a $project to omit fields and reduce the change stream event to less than 16 MB.

Another potential solution would involve tracking the last successful change.ClusterTime - which is a BsonTimestamp - and using that with ChangeStreamOptions.StartAtOperationTime after incrementing the BsonTimestamp enough to avoid the >16MB change stream document. BsonTimestamp consists of an Increment (monotonically increasing counter of operations within a single second) and a Timestamp (seconds since Unix epoch).

Hopefully that provides you with some ideas of how to work around this issue.

Sincerely,
James

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.