Summary
I have a document with different fields, which are updated independently (e.g.: two measure values).
Sometimes, when both methods run at the same time, the initial upsert fails in one method and a MongoDuplicateKeyException is thrown.
This seems only to happen, when the filter has more criteria than only the document’s Id.
When the document has been created once, updates of both methods are always successful.
As I could read in the mongo DB documentation, updateOne is atomic. Therefore I expected, that an upsert should never fail.
Is that a bug in the mongo DB or in te C# driver?
And if not, what is the best way to get rid of this behavior?
I could repeat the upsert operation, when the exception was thrown, but I hope that there’s a better solution.
Here’s my environment:
- mongo DB hosted in kubernetes by using the following docker image: mongodb:6.0.3-debian-11-r9
- C# Driver: 2.27
- .Net 8 Core
Code example
Here’s an example of a document, where FirstMeasureValue ist updated by one method while SecondMeasureValue is updated by another method:
public record MeasureResultsDocument
{
public int Id { get; init; }
public decimal FirstMeasureValue { get; init; }
public DateTimeOffset? FirstMeasureValueChangedAt { get; init; }
public decimal SecondMeasureValue { get; init; }
public DateTimeOffset? SecondMeasureValueChangedAt { get; init; }
}
The goal is to set each measure value when it is older than the new measured value. Both measure values have to be updated independent of each other.
But unfortunately sometimes a MongoDuplicateKeyException is thrown when both upserts happen at the same time on the same Id.
Here’s an example code, which creates 250 MeasureResults with dummy data:
using MongoDB.Driver;
using MongoDbSerialization.Documents;
namespace MongoDbSerialization.Services;
internal class MeasureValueMongoDbService
{
private const string DatabaseName = "my-database";
private const string CollectionName = "MeasureResults";
private const string Url = "mongodb://my-server:my-password@localhost:27017";
private readonly MongoClient _mongoClient;
internal MeasureValueMongoDbService() =>
_mongoClient = new MongoClient(Url);
internal async Task SaveManyDummyMeasureResults()
{
var tasks = new List<Task>();
for (var id = 1; id <= 250; ++id)
{
tasks.Add(item: UpdateFirstMeasureValue(id, measureValue: id + id / 100m, DateTimeOffset.UtcNow));
tasks.Add(
item: UpdateSecondMeasureValue(id, measureValue: id - id / 100m, changedAt: DateTimeOffset.UtcNow.AddHours(hours: 1)));
}
try
{
await Task.WhenAll(tasks);
Console.WriteLine(value: "Completed");
}
catch (Exception ex)
{
Console.WriteLine(value: $"Upsert failed: {ex.Message}");
}
}
private async Task UpdateFirstMeasureValue(int measureResultsId, decimal measureValue, DateTimeOffset changedAt)
{
var filter = CreateFilter(
measureResultsId,
changedAtFieldName: nameof(MeasureResultsDocument.FirstMeasureValueChangedAt),
changedAt);
var updateDefinition = Builders<MeasureResultsDocument>
.Update.SetOnInsert(measureResults => measureResults.Id, measureResultsId)
.Set(measureResults => measureResults.FirstMeasureValue, measureValue)
.Set(measureResults => measureResults.FirstMeasureValueChangedAt, changedAt);
await UpdateMeasureResults(measureResultsId, filter, updateDefinition);
}
private async Task UpdateSecondMeasureValue(int measureResultsId, decimal measureValue, DateTimeOffset changedAt)
{
var filter = CreateFilter(
measureResultsId,
changedAtFieldName: nameof(MeasureResultsDocument.SecondMeasureValueChangedAt),
changedAt);
var updateDefinition = Builders<MeasureResultsDocument>
.Update.SetOnInsert(measureResults => measureResults.Id, measureResultsId)
.Set(measureResults => measureResults.SecondMeasureValue, measureValue)
.Set(measureResults => measureResults.SecondMeasureValueChangedAt, changedAt);
await UpdateMeasureResults(measureResultsId, filter, updateDefinition);
}
private async Task UpdateMeasureResults(
int measureResultId,
FilterDefinition<MeasureResultsDocument> filter,
UpdateDefinition<MeasureResultsDocument> updateDefinition)
{
var collection = await GetCollection();
try
{
await collection.UpdateOneAsync(filter, updateDefinition, options: new UpdateOptions { IsUpsert = true });
}
catch (MongoDuplicateKeyException)
{
Console.WriteLine(value: $"Measure result with Id {measureResultId} already exists");
}
}
private static FilterDefinition<MeasureResultsDocument> CreateFilter(
int measureResultsId,
string changedAtFieldName,
DateTimeOffset changedBefore)
{
var idFilter = Builders<MeasureResultsDocument>.Filter.Eq(measureResults => measureResults.Id, measureResultsId);
var changedAtNotSetFilter = Builders<MeasureResultsDocument>.Filter.Exists(changedAtFieldName, exists: false);
var changedAtNullFilter = Builders<MeasureResultsDocument>.Filter.Eq(
field: (FieldDefinition<MeasureResultsDocument, DateTimeOffset?>)changedAtFieldName,
value: null);
var changedAtBeforeFilter = Builders<MeasureResultsDocument>.Filter.Lt(changedAtFieldName, changedBefore);
return Builders<MeasureResultsDocument>.Filter.And(
idFilter,
Builders<MeasureResultsDocument>.Filter.Or(
changedAtNotSetFilter,
changedAtNullFilter,
changedAtBeforeFilter));
}
private async Task<IMongoCollection<MeasureResultsDocument>> GetCollection()
{
var database = _mongoClient.GetDatabase(DatabaseName);
var collection = database.GetCollection<MeasureResultsDocument>(CollectionName);
await collection.Indexes.CreateOneAsync(
model: new CreateIndexModel<MeasureResultsDocument>(
keys: new IndexKeysDefinitionBuilder<MeasureResultsDocument>().Ascending(field: nameof(MeasureResultsDocument.Id))));
return collection;
}
}