Esta guía proporciona información general sobre el controlador Java Reactive Streams y su API asíncrona. También enumera y explica ejemplos de implementaciones de suscriptores personalizados.
Nota
Para obtener instrucciones sobre cómo instalar el controlador, consulte la Guía deinicio rápido.
Flujos reactivos
Esta biblioteca implementa la especificación de flujos reactivos. La API de flujos reactivos consta de los siguientes componentes:
A Publisher es un proveedor de un número potencialmente ilimitado de elementos secuenciados, publicados según la demanda recibida de su Subscriber o múltiples instancias de Subscriber.
En respuesta a una llamada a Publisher.subscribe(Subscriber), las posibles secuencias de invocación para los métodos de la clase Subscriber se dan mediante el siguiente protocolo:
onSubscribe onNext* (onError | onComplete)?
Esto significa que siempre se envía la señal onSubscribe, seguida de un número posiblemente ilimitado de señales onNext, según lo solicitado por Subscriber. A esto le sigue una señal onError si hay un fallo o una señal onComplete cuando no hay más elementos disponibles, siempre que la señal Subscription no se cancele.
Tip
Para obtener más información sobre los flujos reactivos, visita la documentación de Flujos reactivos.
Suscriptores
La API del controlador Java Reactive Streams refleja la API del controlador Java Sync y cualquier método que haga que la E/S de red devuelva un Publisher<T> tipo, donde T es el tipo de respuesta para la operación.
Nota
Todos los Publisher tipos devueltos por la API son fríos, lo que significa que no ocurre nada hasta que se suscriben. Por lo tanto, la simple creación de un Publisher no causará ninguna E/S de red.Publisher.subscribe() El controlador no ejecuta la operación hasta que se llama al método.
Los publicadores en esta implementación son unicast.Cada Subscription a un Publisher se relaciona con una sola operación de MongoDB, y el de la Publisher instancia Subscriber recibe su propio conjunto de resultados.
Implementaciones de suscriptores personalizados
En la documentación de Java Reactive Streams, hemos implementado diferentes Subscriber tipos. Si bien este es un escenario artificial para flujos reactivos, bloqueamos los resultados de un ejemplo antes de iniciar el siguiente para garantizar el estado de la base de datos. Para ver el código fuente de todas las implementaciones de suscriptores personalizados, consulte SubscriberHelpers.java en el código fuente del controlador.
ObservableSubscriber- La clase base del suscriptor es el ObservableSubscriber<T>, un
Subscriberque almacena los resultados delPublisher<T>. También contiene un métodoawait()para que podamos bloquear los resultados y garantizar el estado de la base de datos antes de pasar al siguiente ejemplo.
OperationSubscriber- Una implementación de
ObservableSubscriberque llama inmediatamente aSubscription.request()cuando se suscribe a él.
PrintSubscriber- Una implementación de
OperationSubscriberque imprime un mensaje cuando se llama al métodoSubscriber.onComplete().
ConsumerSubscriber- Una implementación de
OperationSubscriberque toma unConsumery llama aConsumer.accept(result)cuando se llama aSubscriber.onNext(T result).
PrintToStringSubscriber- Una implementación de
ConsumerSubscriberque imprime la versión de cadena deresultcuando se llama al métodoSubscriber.onNext().
PrintDocumentSubscriber- Una implementación de
ConsumerSubscriberque imprime la versión JSON de un tipoDocumentcuando se llama al métodoSubscriber.onNext().
Ejemplos de bloqueo y no bloqueo
Como nuestros tipos Subscriber contienen un pestillo que solo se libera al llamar al método onComplete() del Subscriber, podemos usar ese pestillo para bloquear acciones posteriores llamando al método await. Los dos ejemplos siguientes usan nuestra solicitud automática PrintDocumentSubscriber.
El primero no es bloqueante y el segundo se bloquea esperando a que Publisher se complete:
// Create a publisher Publisher<Document> publisher = collection.find(); // Non-blocking publisher.subscribe(new PrintDocumentSubscriber()); Subscriber<Document> subscriber = new PrintDocumentSubscriber(); publisher.subscribe(subscriber); subscriber.await(); // Block for the publisher to complete
Editores, suscriptores y suscripciones
En general, los tipos Publisher, Subscriber y Subscription constituyen una API de bajo nivel, y se espera que los usuarios y las bibliotecas creen API más expresivas a partir de ellos, en lugar de usar únicamente estas interfaces. Como biblioteca que implementa exclusivamente estas interfaces, los usuarios se beneficiarán de este ecosistema en crecimiento, que es un principio de diseño fundamental de los flujos reactivos.