Async reactive stream driver subscriber hangs

Hi all,

Currently I’m trying to implement the async reactive driver in Java and have an issue with an hanging subscriber. What I mean with that is that I’ve a subscriber class that implements ‘Subscriber’ with the necessary overrides and after some testing I found out that when my subscriber gets called on an empty collection it works completely fine (the onNext() does nothing but onComplete() works as intended). But after saving the document to the collection and trying to load it again the subscriber class only calls the onSubscribe() method (since the debug message shows up in console) and then nothing else is happening (no other debug messages are shown). Anyone that has an idea why this is happening and can tell me what I’ve done wrong?

This is the code that attaches the subscriber implementation ‘LoadPlayerWrapperSubscriber()’ to the publisher:

/**
     * Load a collection of PlayerWrappers from the database corresponding to the given UUIDs. Will skip any UUIDs that
     * are already loaded. Will not execute if the given set of UUIDs is empty.
     *
     * @param playerUUIDs Set of player UUIDs to load
     */
    public void loadPlayers(Set<UUID> playerUUIDs) {
        //Skip if no players to load
        if (playerUUIDs.isEmpty()) {
            UDebug.logInfo("No players to load!", this.getClass().getSimpleName(), true);
            return;
        }

        UDebug.logInfo("loadPlayers() unfiltered: " + String.join(",", playerUUIDs.stream().map(UUID::toString).collect(Collectors.toSet())), this.getClass().getSimpleName(), true);


        //Remove existing loaded players from the lookup set
        playerUUIDs.removeAll(this.wrappers.keySet());

        UDebug.logInfo("loadPlayers() filtered: " + String.join(",", playerUUIDs.stream().map(UUID::toString).collect(Collectors.toSet())), this.getClass().getSimpleName(), true);

        //Get the collection to load from
        MongoCollection<Document> playerCollection = this.plugin.getMongoDatabase().getCollection("players");

        //Create the filter (selection of documents to load using 'in' operator)
        List<String> playerUUIDStrings = playerUUIDs.stream().map(UUID::toString).toList();
        Bson filter = Filters.in("uuid", playerUUIDStrings);

        //Execute the load for the remaining players
        playerCollection.find(filter).subscribe(new LoadPlayerWrappersSubscriber(this, playerUUIDs));
    }

And this is the LoadPlayerWrapperSubscriber() class implementation:

public class LoadPlayerWrappersSubscriber implements Subscriber<Document> {

    private final PlayerManager playerManager;
    private final Set<UUID> lookupUUIDs = new HashSet<>();

    public LoadPlayerWrappersSubscriber(PlayerManager playerManager, Set<UUID> lookupUUIDs) {
        this.playerManager = playerManager;
        this.lookupUUIDs.addAll(lookupUUIDs);
    }

    @Override
    public void onSubscribe(Subscription s) {
        s.request(this.lookupUUIDs.size());

        UDebug.logInfo("onSubscribe() loading", getClass().getSimpleName(), true);
    }

    @Override
    public void onNext(Document document) {
        UUID playerWrapperUUID = UUID.fromString(document.getString("uuid"));
        this.playerManager.addWrapper(playerWrapperUUID, new PlayerWrapper(document));
        this.lookupUUIDs.remove(playerWrapperUUID);

        UDebug.logInfo(String.format("Loaded player wrapper for UUID: %s", playerWrapperUUID), getClass().getSimpleName(), true);
    }

    @Override
    public void onError(Throwable t) {
        UDebug.logSevere("Failed to load player(s) from database!", getClass().getSimpleName(), false);
        UDebug.logSevere(t.getLocalizedMessage(), getClass().getSimpleName(), false);
    }

    @Override
    public void onComplete() {
        UDebug.logInfo("Finished loading player(s) from database!", getClass().getSimpleName(), true);

        for (UUID uuid : this.lookupUUIDs) {
            this.playerManager.addWrapper(uuid, new PlayerWrapper(uuid));

            UDebug.logInfo(String.format("Created player wrapper for UUID: %s", uuid), getClass().getSimpleName(), true);
        }
    }
}

And the reactive stream driver dependency in my pom.xml:

<dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-reactivestreams</artifactId>
            <version>4.11.0</version>
        </dependency>

I’ve created a topic on another forum (Spigot) about the issue where I’ve given more details about the situation if necessary.