Change Stream issue

Hi Team,

I am facing one issue with ChangeStream.
I am using ChangeStream to send email alert for backend deletion from MongoDB collection, however for bulk deletion emails are triggering.
Please help me to fix this issue or do we have any other option instead of ChangeStream.

Below is my code:

var options = new ChangeStreamOptions();
            options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;
            options.FullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable;
            using (var cursor = collection.Watch(options))
            {
                while (cursor.MoveNext())
                {
                     //Sending E-mail code
                }
             }

Thanks,
Lalitha.C

what issue? are you saying you don’t want to get notified by bulk delete event?

Aplogies. Actual issue is mails are not triggering incase of bulk deletion.
Lets say if I delete 50 records from collection at a time, only 20 mails are triggering and query is not getting killed which leads to deadlock.

My requirement : Need to send all 50 mails and query should get killed.
please note count depends on the number of records deleted, it can be more than 1000 also.

I found there is a limitation for ChangeStream
“Change stream response documents must adhere to the 16MB BSON document limit. Depending on the size of documents in the collection against which you open a change stream, notifications may fail if the resulting notification document exceeds the 16MB limit.”

Please let me know any solution is there to solve this issue.

Thanks,
Lalitha.C

what you mean by this?

This question says in case of bulk writes, you will receive one notification for each of the write event. So you should be ideally notified 50 times unless the before-deleted document exceeds 16MB limit.

In the exceed case, the doc doesn’t have any workaround, You will have to reduce the result size from the events.

To limit the event size, you can:

  • Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like updateDescription are not large.
  • Request only post-images in the change stream output for documents up to 16 megabytes if other change stream event fields like updateDescription are not large.
  • Request only pre-images in the change stream output for documents up to 16 megabytes if:
    • document updates affect only a small fraction of the document structure or content, and
    • do not cause a replace change event. A replace event always includes the post-image.

Sure … Thank you Kobe.

Hi Kobe,

I need help optimizing my change stream code.
If possible please go through my code and suggest.
My main concern is, as we are using a cursor and connection to keep open to DB, it is using more memory and CPU utilization is becoming high because other applications are getting stuck.
Below is my code:

public async Task RealtionalCollectionCollectionChange(CancellationToken cancellationToken)
        {
            var options = new ChangeStreamOptions
            {   
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
                FullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable
            };            
            string logHistory = string.Empty;          

            using (var cursor = await collection.WatchAsync(options, cancellationToken))
            {
                while (await cursor.MoveNextAsync(cancellationToken))
                {                    
                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }

                    foreach (var change in cursor.Current)
                    {
                        if (change.OperationType == ChangeStreamOperationType.Invalidate)
                        {
                            _logger.LogWarning("Change stream cursor has been invalidated");
                            _createLogService.CreateLogs("Error","Change stream cursor has been invalidated");
                            break;
                        }

                        var key = change.DocumentKey.GetValue("_id").ToString();

                        switch (change.OperationType)
                        {
                            case ChangeStreamOperationType.Insert:
                                await InsertIntoHistoryCollection(change);
                                await TriggerEmail(change);
                                break;

                            case ChangeStreamOperationType.Delete:
                                _logger.LogInformation("{Key} has been deleted from Mongo DB", key);
                                logHistory = key + " has been deleted from Mongo DB";
                                var filter = Builders<BsonDocument>.Filter.Eq("_id", ObjectId.Parse(key.ToString()));
                                var document = await collectionHistory.Find(filter).FirstOrDefaultAsync();

                                try
                                {
                                    await _mailService.SendEmail(change, document, logHistory);
                                }
                                catch (Exception ex)
                                {
                                    _logger.LogError(ex, "An error occurred while sending email for {Key} for operation type {OperationType}", key, change.OperationType);
                                    _createLogService.CreateLogs("Error", "An error occurred while sending email for {Key} for operation type {OperationType}");
                                }
                                break;
                        }
                    }
                }
            }
}

Thanks,
Lalitha.C