We have an Audit log implementation (C#) that uses a change stream with various filters to watch change & replace operations. The audit log uses pre images to generate a diff of an update & persist.
While testing the resistance of the implementation I encounter a scenario that fails at the driver level.
When updating a document to close to the 16mb Bson threshold, the update is accepted but the resulting change stream object surpasses the limit with the various addition fields included. This is resulting in the driver throwing an exception when attempting to process the message:
ongoDB.Driver.MongoCommandException: Command aggregate failed: PlanExecutor error during aggregation :: caused by :: BSONObj size: 29749890 (0x1C5F282) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "82631AFEEF000000032B022C0100296E5A1004602205B9156343C2AADFC6E485D4A71446645F69640064631A0BEE6B2B2CADFE192D310004" }.
at MongoDB.Driver.Core.WireProtocol.CommandUsingCommandMessageWireProtocol`1.ProcessResponse(ConnectionId connectionId, CommandMessage responseMessage)
at MongoDB.Driver.Core.WireProtocol.CommandUsingCommandMessageWireProtocol`1.ExecuteAsync(IConnection connection, CancellationToken cancellationToken)
at MongoDB.Driver.Core.Servers.Server.ServerChannel.ExecuteProtocolAsync[TResult](IWireProtocol`1 protocol, ICoreSession session, CancellationToken cancellationToken)
at MongoDB.Driver.Core.Operations.RetryableReadOperationExecutor.ExecuteAsync[TResult](IRetryableReadOperation`1 operation, RetryableReadContext context, CancellationToken cancellationToken)
at MongoDB.Driver.Core.Operations.ReadCommandOperation`1.ExecuteAsync(RetryableReadContext context, CancellationToken cancellationToken)
at MongoDB.Driver.Core.Operations.AggregateOperation`1.ExecuteAsync(RetryableReadContext context, CancellationToken cancellationToken)
at MongoDB.Driver.Core.Operations.ChangeStreamOperation`1.ExecuteAsync(IReadBinding binding, CancellationToken cancellationToken)
As the change stream implementation is watching changes, we ideally do not want to miss any updates. We, therefore, use ResumeAfter with the ResumeToken._data
field of the last successfully processed event. As the event cannot be deserialised by the driver, we can never get the bad messages resumption token to skip over it causing continuous failure.
Are there any ways to get a change stream event resume token without loading the message? (Perhaps we could use an aggregate to project just the token?)