How to efficiently handle transactions with the reactive java driver / clarifications on api

I have posted this question on stackoverflow, however haven’t received any good answers so far: https://stackoverflow.com/questions/62810656/generic-transaction-error-handling-with-rxjava-mongodb

Basically, I find myself pretty much handling all transaction in the same way:

.flatMap(someRepository.commitTransaction(sessionReference)) // commit
.doOnError(throwable -> sessionReference.get().abortTransaction()) // rollback
.doFinally(() -> sessionReference.get().close()) 

as copying all that code every time isn’t such a nice idea, I was looking for a generic way to handle this.

Additionally a user suggested that I should be using the api described here: https://docs.mongodb.com/manual/core/transactions/ as it would provide additional retry logic, handle write conflicts etc.

I haven’t found anything in the docs, that there would be such a difference between those apis. Additionally there are no async/reactive examples, and I couldn’t find this api in the reactive driver. Could someone please clarify, if these claims are true? If so, what does one need to implement to have the reactive driver deal with those situations in a similar way?

bump.

Is there any official information available on details about transaction, retry behaviour and the reactive driver?

bump again. Is there anyone on this forum with more in-depth knowledge about transactions and the reactive driver? Is there probably a better place to ask these type of questions?

Hi @st-h,

Would you be able to abstract that away in a convenient wrapper or method? Having said that, sometimes you do have to handle them differently, i.e. doOnError() you may not want to only abort.

Depending on your use case and code structure, maybe you could use await to synchronise the transaction part for a different code structure ? for example:

 ObservableSubscriber<ClientSession> sessionSubscriber = new OperationSubscriber<>();
 mongoClient.startSession().subscribe(sessionSubscriber);
 sessionSubscriber.await(5, SECONDS);
        
 try (ClientSession clientSession = sessionSubscriber.getReceived().get(0)) {
     clientSession.startTransaction(TransactionOptions.builder()
                  .writeConcern(WriteConcern.MAJORITY).build());

     // operations .. 

     ObservableSubscriber<Void> commitSubscriber = new OperationSubscriber<>();
     clientSession.commitTransaction().subscribe(commitSubscriber);
     commitSubscriber.await();
}

Unfortunately the convenient withTransactions() API is available on the synchronous version currently (v4.1). There is an open ticket to track this work JAVA-3539, feel free to upvote/watch the ticket to receive notifications on progress.

Regards,
Wan.