Scala ドライバーは 、非同期の非ブロッキング ドライバーです。 Observable
モデルを実装することで、非同期イベントは単純で構成可能な操作になり、ネストされたコールバックの複雑さはありません。
非同期操作の場合、次の 3 つのインターフェースがあります。
注意
ドライバーはMongoDB Reactive Streams ドライバー上にビルドされ、リアクティブ ストリーム仕様の実装です。 Observable
はPublisher
の実装であり、 Observer
はSubscriber
の実装です。
次のクラス命名規則が適用されます。
Observable
: のカスタム実装Publisher
Observer
: のカスタム実装Subscriber
Subscription
Observable
Observable
は拡張Publisher
の実装であり、一般に、 Subscription
からObservable
へのリクエストに基づいてObserver
に結果を出力する MongoDB 操作を表します。
重要
Observable
は、部分的な関数と考えることができます。 部分的な関数と同様に、呼び出されるまで何も発生しません。 Observable
は複数回サブスクライブできます。サブスクライブごとに、MongoDB のクエリやデータの挿入など、新しい副作用が発生する可能性があります。
SingleObservable
シングルオブジェクトのみを返す Publisher
の実装です。通常の と同じ方法で使用できます。Observable
サブスクリプション
Subscription
は、 Observer
がObservable
にサブスクライブする 1 対 1 のライフサイクルを表します。 Observable
Subscription
は、単一のObserver
でのみ使用できます。 Subscription
の目的は、需要を制御し、 Observable
からのサブスクリプションを許可することです。
観察者
Observer
は、 Observable
からプッシュベースの通知を受信するメカニズムを提供します。 これらのイベントの需要は、 Subscription
によって示されます。
Observable[TResult]
にサブスクライブすると、 Observer
はonSubscribe(subscription:
Subscription)
メソッドを介してSubscription
に渡されます。 結果の要求はSubscription
を介してシグナルされ、すべての結果はonNext(result:
TResult)
メソッドに渡されます。 何らかの理由でエラーが発生した場合、 onError(e:
Throwable)
メソッドが呼び出され、それ以上のイベントがObserver
に渡されません。 あるいは、 Observer
がObservable
からの結果をすべて消費した場合、 onComplete()
メソッドが呼び出されます。
バックプレッシャー
次の例では、 Observable
を反復処理するときに需要を制御するためにSubscription
が使用されています。 デフォルトのObserver
実装はすべてのデータを自動的にリクエストします。 以下では、 onSubscribe()
メソッドのカスタムをオーバーライドして、 Observable
の要求に応じた反復を管理できるようにします。
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
観察可能なヘルパー
org.mongodb.scala
パッケージは、 Publisher
型との相互作用を改善します。 拡張機能には、匿名関数による単純なサブスクライブが含まれます。
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala
パッケージには暗黙的な クラスが含まれています。このクラスでは、 Publisher
またはObservable
インスタンスのチェーンと操作を簡素化するための次の MongoDB 演算子も提供されています。
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
次のリストでは、使用可能な MongoDB 演算子を説明しています。
andThen
:Observable
インスタンスの連鎖を許可します。collect
: すべての結果をシーケンスに収集します。fallbackTo
: 障害が発生した場合に代替のObservable
にフォールバックできるようにします。filter
:Observable
の結果をフィルタリングします。flatMap
:Observable
の各結果に関数を適用して新しいObservable
を作成します。foldLeft
: 適用されたアキュムレータ関数の単一の結果を含む新しいObservable
を作成します。foreach
: 出力された結果それぞれに適用される関数を適用します。head
: 内のObservable
Future
のヘッドを返します。map
:Observable
のそれぞれの出力結果に関数を適用して、新しいObservable
を作成します。observeOn
: 将来の操作に特定のExecutionContext
を使用する新しいObservable
を作成します。recover
: 別のObservable
の値を割り当てて、このObservable
に含まれる可能性のある一致する例外を処理する新しいObservable
を作成します。recoverWith
: このObservable
に含まれる可能性のある一致するスローオブジェクトを処理する新しいObservable
を作成します。toFuture
:Observable
の結果を収集し、Future
に変換します。transform
: 出力された結果ごとにresultFunction
関数を適用して、新しいObservable
を作成します。withFilter
:Observable
インスタンスに大文字と小文字のサポートを提供します。zip
: この と別のObservable
の値を圧縮し、その結果の 1 倍を保持する新しいObservable
を作成します。
各演算子の詳細については、 BoxedPublisher APIドキュメント を参照してください。
SingleObservable
SingleObservable[T]
は 1 つのアイテムを返すため、 toFuture()
メソッドは ヘッド メソッドと同じ方法でFuture[T]
を返します。 また、 Publisher
をSingleObservable
に変換する暗黙的な変換機能もあります。