Mongodb-driver-reactivestreams createIndex() doesn't make new Index in MongoDB

Just started working with MongoDB in general and I have a local project that I want to make some indexes on. I’ve tried using the code below but it doesn’t add a new index when I check my collection after this part is executed.

@Inject MongoService mongoService; // this has some methods like getting the MongoDatabase for example
...
MongoDatabase database = mongoService.getDatabase();
MongoCollection<Document> collection = database.getCollection("myCollection");
collection.createIndex(Indexes.ascending("dateCreated"));

I’ve followed this reference minus the subscribe part just to test it out: Create Indexes

Hello @Junmil_Rey_So, welcome to the community.

The API method collection.createIndex returns a org.reactivestreams.Publisher.

As such the collection#createIndex doesn’t create the index until the returned publisher’s subscribe() method is invoked. The subscribe method takes a org.reactivestreams.Subscriber as a parameter, an interface to be implemented. The implemented methods complete the task of creation of the index when the subscribe method is invoked.

Publisher<String> indexPublisher = collection.createIndex(Indexes.ascending("dateCreated"));

// The subscribe method requests the publisher to start the stream of data.
indexPublisher.subscribe(new Subscriber<String>() {

    public void onSubscribe(Subscription s) {
        // Data will start flowing only when this method is invoked.
        // The number 1 indicates the number of elements to be received.
        s.request(1);
    }

    public void onNext(String s) {
        System.out.println("Index created: " + s); // this will be the new index name
    }

    public void onError(Throwable t) {
        System.out.println("Failed: " + t.getMessage());
    }

    public void onComplete() {
        System.out.println("Completed");
    }
});

// This is used to block the main thread until the subscribed task completes asynchronously.
try {
    Thread.sleep(3000);
}
catch(InterruptedException e) {
}

You can try using one of the Java implementations at ReactiveX (like RxJava), instead of the low level API from Reactive Streams.

2 Likes

Thanks a lot for the clear explanation, I see that I need to call the subscribe() method after creating my index then.

I am trying to get my Gradle project using Micronaut and reactivex to index my collections on startup, does that affect how/when I should invoke the subscribe() on them?

Hi @Junmil_Rey_So, the index will be created only when the subscribe method is run.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.