Since Mongodb 4.2 there is opportunity to use updates with aggregation pipeline. In particular, it became possible to use system variables: $$NOW
and $$CLUSTER_TIME
. So now you have 2 ways of setting current timestamp according to documentation. Using operator $currentDate
or variable $$CLUSTER_TIME
But for some reason these two methods works diffirently in some occasions, which is not clear from the docs at all.
For example, I want to store some entities with timestamp field, which has to be unique across the entire collection.
public class SomeDTO
{
[BsonId]
public Guid Id { get; set; }
[BsonElement("timestamp")]
public BsonTimestamp Timestamp { get; set; }
}
Now, let’s write a lot of objects concurrently
[Test]
public void Set_Unique_Timestamp_On_UpsertOne_With_ClusterTime_SysVar()
{
// creating test.test collection with unique timestamp index
var mongoClient = LifetimeScope.Resolve<MongoClientProvider>().Get();
var collection = mongoClient.GetDatabase("test").GetCollection<SomeDTO>("test");
collection.Indexes.CreateOne(
new CreateIndexModel<SomeDTO>(
Builders<SomeDTO>.IndexKeys.Ascending(x => x.Timestamp),
new CreateIndexOptions { Unique = true }));
// upsert operation
async Task UpsertOneAsync(Guid id)
{
var pipeline = new[] { new BsonDocument("$set", new BsonDocument("timestamp", "$$CLUSTER_TIME")) }; // <---- causing troubles
await collection.UpdateOneAsync(
Builders<SomeDTO>.Filter.Eq(x => x.Id, id),
Builders<SomeDTO>.Update.Pipeline(pipeline),
new UpdateOptions { IsUpsert = true })
.ConfigureAwait(false);
}
// running tasks in parallel
var tasks = Enumerable.Range(0, 150)
.Select(_ => UpsertOneAsync(Guid.NewGuid()));
Assert.DoesNotThrowAsync(() => Task.WhenAll(tasks));
}
Test failed with the error below
A write operation resulted in an error. WriteError: { Category : "DuplicateKey", Code : 11000, Message : "E11000 duplicate key error collection: test.test index: timestamp_1 dup key: { timestamp: Timestamp(1666297782, 58) }"
But if I implement of UpsertOne method with $currentDate
operator
async Task UpsertOneAsync(Guid id)
{
await collection.UpdateOneAsync(
Builders<SomeDTO>.Filter.Eq(x => x.Id, id),
Builders<SomeDTO>.Update.CurrentDate(x => x.Timestamp, UpdateDefinitionCurrentDateType.Timestamp),
new UpdateOptions { IsUpsert = true })
.ConfigureAwait(false);
}
test will pass.
Why $$CLUSTER_TIME
sys variable returns equal values in case of parallel upserts? And why $currentDate
don’t?