UpdateOne with upsert on two threads throws MongoDuplicateKeyException

Summary
I have a document with different fields, which are updated independently (e.g.: two measure values).
Sometimes, when both methods run at the same time, the initial upsert fails in one method and a MongoDuplicateKeyException is thrown.
This seems only to happen, when the filter has more criteria than only the document’s Id.
When the document has been created once, updates of both methods are always successful.

As I could read in the mongo DB documentation, updateOne is atomic. Therefore I expected, that an upsert should never fail.

Is that a bug in the mongo DB or in te C# driver?
And if not, what is the best way to get rid of this behavior?
I could repeat the upsert operation, when the exception was thrown, but I hope that there’s a better solution.

Here’s my environment:

  • mongo DB hosted in kubernetes by using the following docker image: mongodb:6.0.3-debian-11-r9
  • C# Driver: 2.27
  • .Net 8 Core

Code example
Here’s an example of a document, where FirstMeasureValue ist updated by one method while SecondMeasureValue is updated by another method:

public record MeasureResultsDocument
{
    public int Id { get; init; }

    public decimal FirstMeasureValue { get; init; }
    public DateTimeOffset? FirstMeasureValueChangedAt { get; init; }

    public decimal SecondMeasureValue { get; init; }
    public DateTimeOffset? SecondMeasureValueChangedAt { get; init; }
}

The goal is to set each measure value when it is older than the new measured value. Both measure values have to be updated independent of each other.
But unfortunately sometimes a MongoDuplicateKeyException is thrown when both upserts happen at the same time on the same Id.

Here’s an example code, which creates 250 MeasureResults with dummy data:

using MongoDB.Driver;
using MongoDbSerialization.Documents;

namespace MongoDbSerialization.Services;

internal class MeasureValueMongoDbService
{
    private const string DatabaseName = "my-database";
    private const string CollectionName = "MeasureResults";
    private const string Url = "mongodb://my-server:my-password@localhost:27017";

    private readonly MongoClient _mongoClient;

    internal MeasureValueMongoDbService() =>
        _mongoClient = new MongoClient(Url);

    internal async Task SaveManyDummyMeasureResults()
    {
        var tasks = new List<Task>();
        for (var id = 1; id <= 250; ++id)
        {
            tasks.Add(item: UpdateFirstMeasureValue(id, measureValue: id + id / 100m, DateTimeOffset.UtcNow));
            tasks.Add(
                item: UpdateSecondMeasureValue(id, measureValue: id - id / 100m, changedAt: DateTimeOffset.UtcNow.AddHours(hours: 1)));
        }

        try
        {
            await Task.WhenAll(tasks);
            Console.WriteLine(value: "Completed");
        }
        catch (Exception ex)
        {
            Console.WriteLine(value: $"Upsert failed: {ex.Message}");
        }
    }

    private async Task UpdateFirstMeasureValue(int measureResultsId, decimal measureValue, DateTimeOffset changedAt)
    {
        var filter = CreateFilter(
            measureResultsId,
            changedAtFieldName: nameof(MeasureResultsDocument.FirstMeasureValueChangedAt),
            changedAt);
        var updateDefinition = Builders<MeasureResultsDocument>
                               .Update.SetOnInsert(measureResults => measureResults.Id, measureResultsId)
                               .Set(measureResults => measureResults.FirstMeasureValue, measureValue)
                               .Set(measureResults => measureResults.FirstMeasureValueChangedAt, changedAt);
        await UpdateMeasureResults(measureResultsId, filter, updateDefinition);
    }

    private async Task UpdateSecondMeasureValue(int measureResultsId, decimal measureValue, DateTimeOffset changedAt)
    {
        var filter = CreateFilter(
            measureResultsId,
            changedAtFieldName: nameof(MeasureResultsDocument.SecondMeasureValueChangedAt),
            changedAt);
        var updateDefinition = Builders<MeasureResultsDocument>
                               .Update.SetOnInsert(measureResults => measureResults.Id, measureResultsId)
                               .Set(measureResults => measureResults.SecondMeasureValue, measureValue)
                               .Set(measureResults => measureResults.SecondMeasureValueChangedAt, changedAt);

        await UpdateMeasureResults(measureResultsId, filter, updateDefinition);
    }

    private async Task UpdateMeasureResults(
        int measureResultId,
        FilterDefinition<MeasureResultsDocument> filter,
        UpdateDefinition<MeasureResultsDocument> updateDefinition)
    {
        var collection = await GetCollection();

        try
        {
            await collection.UpdateOneAsync(filter, updateDefinition, options: new UpdateOptions { IsUpsert = true });
        }
        catch (MongoDuplicateKeyException)
        {
            Console.WriteLine(value: $"Measure result with Id {measureResultId} already exists");
        }
    }

    private static FilterDefinition<MeasureResultsDocument> CreateFilter(
        int measureResultsId,
        string changedAtFieldName,
        DateTimeOffset changedBefore)
    {
        var idFilter = Builders<MeasureResultsDocument>.Filter.Eq(measureResults => measureResults.Id, measureResultsId);

        var changedAtNotSetFilter = Builders<MeasureResultsDocument>.Filter.Exists(changedAtFieldName, exists: false);
        var changedAtNullFilter = Builders<MeasureResultsDocument>.Filter.Eq(
            field: (FieldDefinition<MeasureResultsDocument, DateTimeOffset?>)changedAtFieldName,
            value: null);
        var changedAtBeforeFilter = Builders<MeasureResultsDocument>.Filter.Lt(changedAtFieldName, changedBefore);

        return Builders<MeasureResultsDocument>.Filter.And(
            idFilter,
            Builders<MeasureResultsDocument>.Filter.Or(
                changedAtNotSetFilter,
                changedAtNullFilter,
                changedAtBeforeFilter));
    }

    private async Task<IMongoCollection<MeasureResultsDocument>> GetCollection()
    {
        var database = _mongoClient.GetDatabase(DatabaseName);

        var collection = database.GetCollection<MeasureResultsDocument>(CollectionName);
        await collection.Indexes.CreateOneAsync(
            model: new CreateIndexModel<MeasureResultsDocument>(
                keys: new IndexKeysDefinitionBuilder<MeasureResultsDocument>().Ascending(field: nameof(MeasureResultsDocument.Id))));

        return collection;
    }
}

Hi, @AnRy,

Welcome to the MongoDB Community Forums. I understand that you’re encountering a MongoDuplicateKeyException when performing simultaneous upserts.

In MongoDB the _id field is required and must be unique. If one is not specified the driver (or server) will automatically generate an _id for you using an ObjectId by default. You are encountering a MongoDuplicateKeyException because your application is trying to insert two documents with the same _id (which is mapped to Id in C#).

The reason for the duplicate inserts is due to performing upserts. When performing an upsert, an update is performed if the filter matches one or more documents and an insert if the filter does not match any documents. Note that whether an insert or an update is performed depends on matching of the filter criteria, not on whether the _id exists.

I would suggest changing your application logic to only match on _id. You will have to determine how to implement your additional filter criteria separately from the upsert filter. Hope that helps!

Sincerely,
James

Hello @James_Kovacs

Thanks for your answer.

If I understand your suggestion correctly, I would have to perform an upsert with the required id to make sure that the document exists, before I perform the update with the combined filter.
That would lead to two steps each time I want to update a document.

Therefore I think, the most efficient way would be to perform the upsert with the combined filter and, when the MongoDuplicateKeyException is thrown, repeat the same operation, which will then perform a successful update.
With this approach there should be only two steps in the very few cases, when two threads perform the initial upsert at the same time.
In all other cases it will have only one step.

Sincerely,
Andreas