This guide provides background about the Java Reactive Streams driver and its asynchronous API. The guide also lists and explains sample custom subscriber implementations.
Note
For instructions on how to install the driver, see the Get Started guide.
Reactive Streams
This library is an implementation of the reactive streams specification. The reactive stream API consists of the following components:
A Publisher is a provider of a potentially unbounded number of sequenced elements, published according to the demand received from its Subscriber or multiple instances of Subscriber.
In response to a call to Publisher.subscribe(Subscriber), the possible invocation sequences for methods on the Subscriber class are given by the following protocol:
onSubscribe onNext* (onError | onComplete)?
This means that onSubscribe is always signaled, followed by a possibly unbounded number of onNext signals, as requested by Subscriber. This is followed by an onError signal if there is a failure or an onComplete signal when no more elements are available, as long as the Subscription is not canceled.
Tip
To learn more about reactive streams, visit the Reactive Streams documentation.
Subscribers
The Java Reactive Streams driver API mirrors the Java Sync driver API and any methods that cause network I/O to return a Publisher<T> type, where T is the type of response for the operation.
Note
All Publisher types returned from the API are cold, meaning that nothing happens until they are subscribed to. So just creating a Publisher won’t cause any network I/O. It’s not until you call the Publisher.subscribe() method that the driver executes the operation.
Publishers in this implementation are unicast. Each Subscription to a Publisher relates to a single MongoDB operation, and the Publisher instance's Subscriber receives its own specific set of results.
Custom Subscriber Implementations
In the Java Reactive Streams documentation, we have implemented different Subscriber types. Although this is an artificial scenario for reactive streams, we do block on the results of one example before starting the next to ensure the state of the database. To see the source code for all the custom subscriber implementations, see SubscriberHelpers.java in the driver source code.
ObservableSubscriber- The base subscriber class is the ObservableSubscriber<T>, a
Subscriberthat stores the results of thePublisher<T>. It also contains anawait()method so we can block for results to ensure the state of the database before going on to the next example.
OperationSubscriber- An implementation of the
ObservableSubscriberthat immediately callsSubscription.request()when it is subscribed to.
PrintSubscriber- An implementation of the
OperationSubscriberthat prints a message when theSubscriber.onComplete()method is called.
ConsumerSubscriber- An implementation of
OperationSubscriberthat takes aConsumerand callsConsumer.accept(result)whenSubscriber.onNext(T result)is called.
PrintToStringSubscriber- An implementation of
ConsumerSubscriberthat prints the string version of theresultwhen theSubscriber.onNext()method is called.
PrintDocumentSubscriber- An implementation of the
ConsumerSubscriberthat prints the JSON version of aDocumenttype when theSubscriber.onNext()method is called.
Blocking and Non-Blocking Examples
As our Subscriber types contain a latch that is only released when the onComplete() method of the Subscriber is called, we can use that latch to block further actions by calling the await method. The following two examples use our auto-requesting PrintDocumentSubscriber.
The first is non-blocking and the second blocks waiting for the Publisher to complete:
// Create a publisher Publisher<Document> publisher = collection.find(); // Non-blocking publisher.subscribe(new PrintDocumentSubscriber()); Subscriber<Document> subscriber = new PrintDocumentSubscriber(); publisher.subscribe(subscriber); subscriber.await(); // Block for the publisher to complete
Publishers, Subscribers, and Subscriptions
In general, Publisher, Subscriber and Subscription types comprise a low level API and it’s expected that users and libraries will build more expressive APIs upon them rather than solely use these interfaces. As a library solely implementing these interfaces, users will benefit from this growing ecosystem, which is a core design principle of reactive streams.