Hi,
I noticed different behavior of the driver in Change Stream in exceptions handling when I upgraded from mongo-scala-driver 2.9.0 to 4.6.0.
I assume that in the code that is handling events of changes in MongoDB (onNext method of the Observer) the exception can be thrown. In that case in onError method I was subscribing to events again. It was working in old version of the driver. Simply when onNext thrown exception, then onError method was called: mongo-java-driver/AbstractSubscription.java at r3.12.1 · mongodb/mongo-java-driver · GitHub
After upgrade to 4.6.0 I noticed that project reactor is used and different code is handling exceptions: reactor-core/FluxCreate.java at main · reactor/reactor-core · GitHub
In the end onError method is not invoked.
This is my simple test for that.
import org.mongodb.scala.{Document, Observer}
import org.mongodb.scala.model.changestream.ChangeStreamDocument
object ObserveTest extends Basic with App {
val separator = "\n\n"
def subscribe(): Unit = {
test_collection.watch().subscribe(new Observer[ChangeStreamDocument[Document]] {
override def onNext(result: ChangeStreamDocument[Document]): Unit = {
println(s"$separator onNext: $result$separator")
throw new RuntimeException("error")
}
override def onError(e: Throwable): Unit = {
println(s"$separator onError: $e$separator ")
subscribe()
}
override def onComplete(): Unit = println(s"$separator onComplete $separator ")
})
}
subscribe()
}
With
“org.mongodb.scala” %% “mongo-scala-driver” % “2.9.0”
it works perfeclty.
But with
“org.mongodb.scala” %% “mongo-scala-driver” % “4.6.0”
when exception is thrown after first event, the subscriber is closed, I don’t receive more events.
Is there any way to handle exception that could potentially been thrown from the code run in onNext method?