We store certain events in a mongo collection that must be fetched and processed. We want to ensure each record is processed once(at least once is also acceptable) and have some parallelism while processing the documents by having multiple instances and each instance running multiple threads to process.
To achieve this, I plan on using a lease-based approach with locking to ensure each record is processed only once. Another method I can think of is to create a change stream and publish events to Kafka and consume and process the Kafka events but this seems a bit too much for this. I am sharing a sample code for the lease-based approach below. In the lease-based approach, I am fetching a record that is not locked using the findOneAndUpdate method and updating the record with locked_by and lease_expires_at so other threads/processes don’t process the same message.
Does using findOneAndUpdate in multiple instances with multiple threads to fetch records parallelly affect performance? Is the findOneAndUpdate thread-safe for multiple threads to call parallelly? Is it possible that two calls find the same record and update it resulting in two threads processing the same request? What type of locking is used and even though numerous threads are querying parallelly will mongo execute them sequentially due to locking? What other options can be explored in this case?
Also, currently not planning on sharding the database and records will be deleted once processed successfully. Another service will be inserting new records but there won’t be any other queries on this collection apart from those mentioned above.
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import org.bson.Document;
import java.util.Date;
public class EventProcessor {
private static final int LEASE_TIMEOUT_SECONDS = 60;
private final MongoCollection<Document> collection;
private final String instanceId;
public EventProcessor(MongoCollection<Document> collection, String instanceId) {
this.collection = collection;
this.instanceId = instanceId;
}
public void processEvents() {
while (true) {
// Acquire a lease on the next available document
Document document = acquireLease();
if (document == null) {
// No documents available for processing, sleep and try again later
Thread.sleep(1000);
continue;
}
try {
// Process the document
processDocument(document);
} finally {
// Release the lease on the document
releaseLease(document);
}
}
}
private Document acquireLease() {
Document updatedDocument = collection.findOneAndUpdate(
new Document("locked_by", null)
.append("lease_expires_at", new Document("$lt", new Date())),
new Document("$set", new Document("locked_by", instanceId)
.append("lease_expires_at", new Date(new Date().getTime() + (LEASE_TIMEOUT_SECONDS * 1000)))),
new FindOneAndUpdateOptions()
.returnDocument(ReturnDocument.AFTER)
);
return updatedDocument;
}
private void releaseLease(Document document) {
collection.updateOne(
new Document("_id", document.getObjectId("_id"))
.append("locked_by", instanceId),
new Document("$set", new Document("locked_by", null)
.append("lease_expires_at", new Date()))
);
}
private void processDocument(Document document) {
// TODO: Implement your document processing logic here
System.out.println("Processing document: " + document.toJson());
}
}