FindOneAndUpdate internal working, locking mechanism and impact on performance on multiple parallel calls on same collection

We store certain events in a mongo collection that must be fetched and processed. We want to ensure each record is processed once(at least once is also acceptable) and have some parallelism while processing the documents by having multiple instances and each instance running multiple threads to process.

To achieve this, I plan on using a lease-based approach with locking to ensure each record is processed only once. Another method I can think of is to create a change stream and publish events to Kafka and consume and process the Kafka events but this seems a bit too much for this. I am sharing a sample code for the lease-based approach below. In the lease-based approach, I am fetching a record that is not locked using the findOneAndUpdate method and updating the record with locked_by and lease_expires_at so other threads/processes don’t process the same message.

Does using findOneAndUpdate in multiple instances with multiple threads to fetch records parallelly affect performance? Is the findOneAndUpdate thread-safe for multiple threads to call parallelly? Is it possible that two calls find the same record and update it resulting in two threads processing the same request? What type of locking is used and even though numerous threads are querying parallelly will mongo execute them sequentially due to locking? What other options can be explored in this case?

Also, currently not planning on sharding the database and records will be deleted once processed successfully. Another service will be inserting new records but there won’t be any other queries on this collection apart from those mentioned above.

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import org.bson.Document;

import java.util.Date;

public class EventProcessor {
    private static final int LEASE_TIMEOUT_SECONDS = 60;

    private final MongoCollection<Document> collection;
    private final String instanceId;

    public EventProcessor(MongoCollection<Document> collection, String instanceId) {
        this.collection = collection;
        this.instanceId = instanceId;
    }

    public void processEvents() {
        while (true) {
            // Acquire a lease on the next available document
            Document document = acquireLease();
            if (document == null) {
                // No documents available for processing, sleep and try again later
                Thread.sleep(1000);
                continue;
            }

            try {
                // Process the document
                processDocument(document);
            } finally {
                // Release the lease on the document
                releaseLease(document);
            }
        }
    }

    private Document acquireLease() {
        Document updatedDocument = collection.findOneAndUpdate(
                new Document("locked_by", null)
                        .append("lease_expires_at", new Document("$lt", new Date())),
                new Document("$set", new Document("locked_by", instanceId)
                        .append("lease_expires_at", new Date(new Date().getTime() + (LEASE_TIMEOUT_SECONDS * 1000)))),
                new FindOneAndUpdateOptions()
                        .returnDocument(ReturnDocument.AFTER)
        );
        return updatedDocument;
    }

    private void releaseLease(Document document) {
        collection.updateOne(
                new Document("_id", document.getObjectId("_id"))
                        .append("locked_by", instanceId),
                new Document("$set", new Document("locked_by", null)
                        .append("lease_expires_at", new Date()))
        );
    }

    private void processDocument(Document document) {
        // TODO: Implement your document processing logic here
        System.out.println("Processing document: " + document.toJson());
    }
}

Generally databases implement this kind of thing with similar/same approach. When selectAndUpdate with same condition are performed by multiple clients, only one client request can succeed, and all the others will fail. This is the guarantee by “compare and swap” , an atomic operation.

Locking is definitely needed since multiple threads are trying to update the same resources, be it a database item or not. All the other threads will have to to wait until the first one to finish as all those should run atomically.

In my project i was also using a lease like mechanism so that only one thread works on one particular document.

1 Like

What I wanted to know is if there are 10 records available that meet the query criteria and if 5 threads run the findOneAndUpdate operation at the same time, will all 5 calls succeed with each thread getting one record of the available 10 or will all 5 threads initially pick the same record and whichever threads update it first will succeed and other fail to update the record? In case the other threads fail to update the record, will they try to find if there are any other records that match the query criteria and update it or just propagate the initial failure upstream?

@Kobe_W you are saying the other threads fail to update and propagate the error upstream. I guess this is kind of expected to fail and not retry. Is there any documentation for this?

i believe the final result would be , 5 threads all succeed (assume no failure to update happens) with each successfully updating different document. However, it’s not clear if they initially try to pick up the same or will automatically pick up different records. Probably it also depends on the underlying locking type.

No error should be returned just because 4 threads fail to compete with the first thread, since this is not an error . Rather it’s an expected behaviour.

this explains atomic updates in mongodb, but all general purpose database are supposed to have similar/same semantics.

1 Like

When selectAndUpdate with same condition are performed by multiple clients, only one client request can succeed, and all the others will fail

This is in contrast to your example, this will be the case where only one such item exists and multiple threads try to compete on it (Failed threads will return “no such document”). If more than one item exists, then multiple thread should succeed as well.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.