Scala ドライバーは 、非同期の非ブロッキング ドライバーです。  Observableモデルを実装することで、非同期イベントは単純で構成可能な操作になり、ネストされたコールバックの複雑さはありません。
非同期操作の場合、次の 3 つのインターフェースがあります。
注意
ドライバーはMongoDB Reactive Streams ドライバー上にビルドされ、リアクティブ ストリーム仕様の実装です。 ObservableはPublisherの実装であり、 ObserverはSubscriberの実装です。
次のクラス命名規則が適用されます。
Observable: のカスタム実装PublisherObserver: のカスタム実装SubscriberSubscription
Observable
Observableは拡張Publisherの実装であり、一般に、 SubscriptionからObservableへのリクエストに基づいてObserverに結果を出力する MongoDB 操作を表します。
重要
Observable は、部分的な関数と考えることができます。 部分的な関数と同様に、呼び出されるまで何も発生しません。 Observableは複数回サブスクライブできます。サブスクライブごとに、MongoDB のクエリやデータの挿入など、新しい副作用が発生する可能性があります。
SingleObservable
シングルオブジェクト のみを返す Publisher の実装です。通常の Observable と同じ方法で使用できます。
サブスクリプション
Subscriptionは、 ObserverがObservableにサブスクライブする 1 対 1 のライフサイクルを表します。 Observable Subscriptionは、単一のObserverでのみ使用できます。 Subscriptionの目的は、需要を制御し、 Observableからのサブスクリプションを許可することです。
観察者
Observerは、 Observableからプッシュベースの通知を受信するメカニズムを提供します。 これらのイベントの需要は、 Subscriptionによって示されます。
Observable[TResult]にサブスクライブすると、 ObserverはonSubscribe(subscription:
Subscription)メソッドを介してSubscriptionに渡されます。 結果の要求はSubscriptionを介してシグナルされ、すべての結果はonNext(result:
TResult)メソッドに渡されます。 何らかの理由でエラーが発生した場合、 onError(e:
Throwable)メソッドが呼び出され、それ以上のイベントがObserverに渡されません。 あるいは、 ObserverがObservableからの結果をすべて消費した場合、 onComplete()メソッドが呼び出されます。
バックプレッシャー
次の例では、 Observableを反復処理するときに需要を制御するためにSubscriptionが使用されています。 デフォルトのObserver実装はすべてのデータを自動的にリクエストします。 以下では、 onSubscribe()メソッドのカスタムをオーバーライドして、 Observableの要求に応じた反復を管理できるようにします。
collection.find().subscribe(new Observer[Document](){   var batchSize: Long = 10   var seen: Long = 0   var subscription: Option[Subscription] = None   override def onSubscribe(subscription: Subscription): Unit = {     this.subscription = Some(subscription)     subscription.request(batchSize)   }   override def onNext(result: Document): Unit = {     println(document.toJson())     seen += 1     if (seen == batchSize) {       seen = 0       subscription.get.request(batchSize)     }   }   override def onError(e: Throwable): Unit = println(s"Error: $e")   override def onComplete(): Unit = println("Completed") }) 
観察可能なヘルパー
org.mongodb.scalaパッケージは、 Publisher型との相互作用を改善します。 拡張機能には、匿名関数による単純なサブスクライブが含まれます。
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()),                             (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()),                             (e: Throwable) => println(s"There was an error: $e"),                             () => println("Completed!")) 
org.mongodb.scalaパッケージには暗黙的な クラスが含まれています。このクラスでは、 PublisherまたはObservableインスタンスのチェーンと操作を簡素化するための次の MongoDB 演算子も提供されています。
GenerateHtmlObservable().andThen({   case Success(html: String) => renderHtml(html)   case Failure(t) => renderHttp500 }) 
次のリストでは、使用可能な MongoDB 演算子を説明しています。
andThen:Observableインスタンスの連鎖を許可します。collect: すべての結果をシーケンスに収集します。fallbackTo: 障害が発生した場合に代替のObservableにフォールバックできるようにします。filter:Observableの結果をフィルタリングします。flatMap:Observableの各結果に関数を適用して新しいObservableを作成します。foldLeft: 適用されたアキュムレータ関数の単一の結果を含む新しいObservableを作成します。foreach: 出力された結果それぞれに適用される関数を適用します。head: 内のObservableFutureのヘッドを返します。map:Observableのそれぞれの出力結果に関数を適用して、新しいObservableを作成します。observeOn: 将来の操作に特定のExecutionContextを使用する新しいObservableを作成します。recover: 別のObservableの値を割り当てて、このObservableに含まれる可能性のある一致する例外を処理する新しいObservableを作成します。recoverWith: このObservableに含まれる可能性のある一致するスローオブジェクトを処理する新しいObservableを作成します。toFuture:Observableの結果を収集し、Futureに変換します。transform: 出力された結果ごとにresultFunction関数を適用して、新しいObservableを作成します。withFilter:Observableインスタンスに大文字と小文字のサポートを提供します。zip: この と別のObservableの値を圧縮し、その結果の 1 倍を保持する新しいObservableを作成します。
各演算子の詳細については、 BoxedPublisher APIドキュメント を参照してください。
SingleObservable
SingleObservable[T]は 1 つのアイテムを返すため、 toFuture()メソッドは ヘッド メソッドと同じ方法でFuture[T]を返します。 また、 PublisherをSingleObservableに変換する暗黙的な変換機能もあります。