MongoDB Java Reactive Streams help

I know how to insert documents from http://mongodb.github.io/mongo-java-driver-reactivestreams/1.13/getting-started/quick-tour/, but how to get the document from db when collection.find().first().subscribe(subscriber); returns Publisher ? Also if all actions are performed async how to make it sync ?

Hello @Zdziszkee_N_A,

… how to get the document from db when collection.find().first().subscribe(subscriber); returns Publisher?

collection.find().first() returns a org.reactivestreams.Publisher<TResult>.

The Publisher's subscibe(Subscriber<? super T> s) method “Requests Publisher to start streaming data.” - it doesn’t return a Publisher.

The tutorial’s examples have a subscriber defined; use that code. There is an example PrintSubscriber, and this can be used to print the find query result.

The SubscriberHelpers is a utility class shown with the examples. It is used as shown in the Quick Tour. The source code for the SubscriberHelpers.java can be found on GitHub: https://github.com/mongodb/mongo-java-driver-reactivestreams/tree/master/examples/tour/src/main/tour. You can use it with your program.

Also if all actions are performed async how to make it sync ?

I guess, you can try using the synchronous Java APIs for those .

Okay but how to get the Document object itself from that ? Casting ?

The Quick Tour - Query the Collection - Find the First Document in a Collection has the code snippet:

subscriber = new PrintDocumentSubscriber();
collection.find().first().subscribe(subscriber);
subscriber.await();

When this code gets executed, it prints the first document from the collection. The print happens using the PrintDocumentSubscriber (which has a description: “A Subscriber that prints the json version of each document”) from the SubscriberHelpers class (I have already provided the link in the previous post).

I don’t think casting is way to get your document. It looks like you have to use the SubscriberHelpers class API for the tutorial, or build your own helpers / code to do the tasks you have on your mind.

1 Like

Hi,

The reactive streams API is fairly low level API and was designed to be a foundation for async stream processing in Java. There are libraries that extend the reactive streams API and make them much more user friendly:

  • Rx Java - The ReactiveX project is really well explained and has many helpers to aid the use of reactive streams.
  • Project Reactor - From the Spring team, a smaller library that is packed full of great features.

I would suggest using a higher level library to aid the use of Publishers by making them easier to consume.

Ross

1 Like

… how to get the document from db when collection.find().first().subscribe(subscriber); returns Publisher ? Also if all actions are performed async how to make it sync ?

Hi @Zdziszkee_N_A , revisiting the post with example code using RxJava. It is much easier to work with the RxJava APIs as mentioned by @Ross_Lawley.

This is how you would query using RxJava. The Maybe class of RxJava allows you to get the queried document (from the collection.find().first() method from Reactive Streams Java Driver, which returns a Publisher<Document>).

Maybe is a flow with no items, exactly one item or an error. This is suitable in this case, as the collection.find().first() may return a publisher with a document, or if there are no documents in the collection, returns an empty publisher.

Example Code:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("books");
Publisher<Document> first = collection.find().first();

// The method returns the first document from the query _or_ an empty document.
Document maybeDoc = Maybe.fromPublisher(first)
                         .blockingGet(new Document());

// Do something with the document
System.out.println(maybeDoc);

NOTE: In the above code, the Maybe.blockingGet method blocks execution of the next statement, until it completes. Note that there is also a blockingGet() without parameters, which returns a null if there are no documents in the collection.