このガイドでは、 Java Reactive Streams ドライバーとその非同期APIの背景を説明します。 このガイドでは、サンプルのカスタム サブスクライブの実装もリストして説明しています。
注意
ドライバーをインストールする方法については、「 を使い始める 」ガイドを参照してください。
Reactive Streams
このライブラリは、リアクティブ ストリーム仕様の実装です。 Reactive Stream API は、次のコンポーネントで構成されています。
Publisherは、 SubscriberまたはSubscriberの複数のインスタンスから受信した要求に従って公開される、潜在的に無制限の数のシーケンス化された要素のプロバイダーです。
Publisher.subscribe(Subscriber)への呼び出しに応答して、 Subscriberクラスのメソッドに可能な呼び出しシーケンスは、次のプロトコルによって提供されます。
onSubscribe onNext* (onError | onComplete)?
つまり、 onSubscribeは常にシグナルられ、その後にSubscriberからリクエストされた場合、上限の数のonNextシグナルが続く可能性があります。 これには、障害が発生した場合はonErrorシグナルが送信され、 Subscriptionがキャンセルされていない限り、使用できる要素が ない 場合はonCompleteシグナルが送信されます。
Tip
Reactive Streams の詳細については、Reactive Streams のドキュメント を参照してください。
サブスクリプション
Java Reactive Streams ドライバーAPIは、 Java Sync Driver APIとネットワーク I/O により Publisher<T> タイプが返されるすべてのメソッドをミラーリングします。ここでは、T は操作の応答のタイプです。
カスタム サブスクリプションの実装
Java Reactive Streams のドキュメントでは、さまざまな Subscriber 型を実装しました。これはリアクティブなストリームの論理的なシナリオですが、データベースの状態を確保するために、次の例を開始する前に、ある例の結果をブロックします。すべてのカスタム サブスクリプション実装のソースコードを確認するには、 ドライバーソースコードの SubscriberHelpers.java を参照してください。
ObservableSubscriber- 基本サブスクライブクラスは ObservableSubscriber[T]<T>
Subscriberで、 の結果を保存するPublisher<T>です。また、await()メソッドも含まれているため、結果の をブロックして、次の例に進む前にデータベースの状態を確保できます。
OperationSubscriber- サブスクライブ時にすぐに
Subscription.request()を呼び出すObservableSubscriberの実装。
PrintSubscriberSubscriber.onComplete()メソッドが呼び出されたときにメッセージを出力するOperationSubscriberの実装。
ConsumerSubscriberOperationSubscriberConsumerConsumer.accept(result)Subscriber.onNext(T result)を受け取り、 が呼び出されたときに呼び出す の実装。
PrintToStringSubscriberSubscriber.onNext()メソッドが呼び出されたときにresultの string バージョンを出力するConsumerSubscriberの実装。
PrintDocumentSubscriberSubscriber.onNext()メソッドの呼び出し時にDocumentタイプの JSON バージョンを出力するConsumerSubscriberの実装。
ブロッキングと非ブロッキングの例
Subscriber型には、 SubscriberのonComplete()メソッドが呼び出された場合にのみリリースされるラッチが含まれているため、 awaitメソッドを呼び出すことで、そのラッチを使用してそれ以上のアクションをブロックできます。 次の 2 つの例では、自動リクエストPrintDocumentSubscriberを使用します。
最初は非ブロッキングで、2 番目のブロックはPublisherが完了するのを待機しています。
// Create a publisher Publisher<Document> publisher = collection.find(); // Non-blocking publisher.subscribe(new PrintDocumentSubscriber()); Subscriber<Document> subscriber = new PrintDocumentSubscriber(); publisher.subscribe(subscriber); subscriber.await(); // Block for the publisher to complete
出版社、サブスクリプション、サブスクリプション
一般に、 Publisher 、 Subscriber 、 Subscription型は低レベル API を構成し、これらのインターフェースのみを使用するのではなく、ユーザーとライブラリがより表現性の高い API を構築することが予想されます。 これらのインターフェースのみを実装するライブラリとして、ユーザーはこの増加するエコシステムのメリットを得ます。これは、リアクティブなストリームの主要設計原則です。