Handling exceptions in Change Stream

Hi,
I noticed different behavior of the driver in Change Stream in exceptions handling when I upgraded from mongo-scala-driver 2.9.0 to 4.6.0.
I assume that in the code that is handling events of changes in MongoDB (onNext method of the Observer) the exception can be thrown. In that case in onError method I was subscribing to events again. It was working in old version of the driver. Simply when onNext thrown exception, then onError method was called: mongo-java-driver/AbstractSubscription.java at r3.12.1 · mongodb/mongo-java-driver · GitHub
After upgrade to 4.6.0 I noticed that project reactor is used and different code is handling exceptions: reactor-core/FluxCreate.java at main · reactor/reactor-core · GitHub
In the end onError method is not invoked.

This is my simple test for that.

import org.mongodb.scala.{Document, Observer}
import org.mongodb.scala.model.changestream.ChangeStreamDocument

object ObserveTest extends Basic with App {

  val separator = "\n\n"

  def subscribe(): Unit = {
    test_collection.watch().subscribe(new Observer[ChangeStreamDocument[Document]] {
      override def onNext(result: ChangeStreamDocument[Document]): Unit = {
        println(s"$separator onNext: $result$separator")
        throw new RuntimeException("error")
      }

      override def onError(e: Throwable): Unit = {
        println(s"$separator onError: $e$separator ")
        subscribe()
      }

      override def onComplete(): Unit = println(s"$separator onComplete $separator ")
    })
  }

  subscribe()

}

With
“org.mongodb.scala” %% “mongo-scala-driver” % “2.9.0”
it works perfeclty.
But with
“org.mongodb.scala” %% “mongo-scala-driver” % “4.6.0”
when exception is thrown after first event, the subscriber is closed, I don’t receive more events.

Is there any way to handle exception that could potentially been thrown from the code run in onNext method?

Hi @Michal,

The Observer in your code does not return normally, it throws an exception. Driver’s Observer is an org.reactivestreams.Subscriber, for which the rule #13 in the reactive streams specification states

Calling onSubscribe , onNext , onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription . In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

The last part of the rule is further clarified in the spec

«Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.

The Observer in your code violates this rule. Even if version 2.9 behaved differently, it is a requirement now. Subscriber.onError exists as the “only legal way to signal failure to a Subscriber (a quote from the spec), i.e., if a Subscriber throws, there is no need to notify it via onError that an exception was thrown, because it was the Subscriber who threw it.

Your Observer may cancel the subscription, and then you may create a new one, if that is what you want, but the Observer methods must return normally.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.