Mongo DB - Migrating from Async Driver to Reactive driver

Hi ,
I have been trying to move from Mongo-DB async driver to Java reactive driver.
so far I have been successful in migrating most of the operations.
But I’m stuck with MongoDbIterable and trying to find a compatible version for reactive driver

Here is the code snippet for async driver

       String param = "hello";
        database. getCollection("sample").find(Filters.eq("mongo", param)).forEach(
                new Block<T>() {
                    @Override
                    public void apply(ProcessingProtectedRegion region) {
                    //my code to handle
                    }
                },
              //implementation of  SingleResultCallback<T>
        );

Im trying to migrate the above snippet to Reactive driver but not able to find the correct operation which would behave similar to the ForEach() of async driver that takes 2 parameter as it react driver operations always needs subscriber

documentation of Async driver ForEach opperation
     /* Iterates over all documents in the view, applying the given block to each, and completing the returned future after all documents
     * have been iterated, or an exception has occurred.
    
     * @param block    the block to apply to each document
     * @param callback a callback that completed once the iteration has completed
     */
    void forEach(Block<? super TResult> block, SingleResultCallback<Void> callback)

The equivalent Reactive Streams code would look something like this:

        collection.find().subscribe(new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription s) {
                // this is required by Reactive Streams to indicate "demand"
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Document document) {
                // this method is called for every document
            }

            @Override
            public void onError(Throwable t) {
                // this method is called once if there is an error
            }

            @Override
            public void onComplete() {
                // this method is called once if there is no error and after all documents have been iterated
            }
        });
    }

In practice you would probably want to define a base class implementing the Subscriber interface that you could re-use across all your queries, or else rely on a third-party library like Project Reactor, which does this for you as well as a whole lot more.

Good luck!

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.