Overview
このガイドでは、ObservableインスタンスからMongoDB操作の結果にアクセスする方法を学びます。
Observable は、一定期間にわたる操作によって出力されるデータのストリームを表します。 このデータにアクセスするには、Observable をサブスクライブする Observerインスタンスを作成します。
注意
Scalaドライバーは、 MongoDB Java Reactive Streams ドライバー上に構築されています。 ObservableクラスはJava Reactive Streams の Publisherクラスを拡張し、結果処理に役立つ便利なメソッドが追加されています。
観察可能な値を処理する方法
MongoDB操作を実行してそのデータを処理するには、Observable から操作結果をリクエスト必要があります。ドライバーは、任意の数の結果を返す操作用の Observable インターフェースを提供します。結果を生成しないか、結果を 1 つ生成する操作(findOne() メソッドなど)では、SingleObservable[T] が返されます。[T]化パラメータは、SingleObservable が取り扱うデータ型に対応します。
無制限の数の結果を生成できる操作では、Observable[T] 型のインスタンスが返されます。 一部の操作では特定の Observable タイプが返され、サブスクライブ前に結果をフィルタリングして処理するための追加メソッドが提供されます。 次のリストでは、操作固有のメソッドを Observable に連鎖させることができるいくつかのタイプについて説明します。
FindObservable[T]:find()メソッドによって返されるDistinctObservable[T]:distinct()メソッドによって返されるAggregateObservable[T]:aggregate()メソッドによって返される
操作の Observable で subscribe() メソッドを呼び出すことで、操作結果をリクエストできます。Observerクラスのインスタンスを subscribe() メソッドのパラメーターとして渡します。この Observer は、Observable から操作結果を受け取ります。その後、Observerクラスが提供するメソッドを使用して、結果を出力したり、エラーを処理したり、追加のデータ処理を実行したりできます。
結果の処理の詳細については、次のAPIドキュメントを参照してください。
サンプル データ
restaurantssample_restaurantsこのガイドの例では、Atlasサンプルデータセット の データベースの コレクションを使用します。Scalaアプリケーションからこのコレクションにアクセスするには、Atlas クラスターに接続する MongoClient を作成し、database 変数と collection 変数に次の値を割り当てます。
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
MongoDB Atlasクラスターを無料で作成して、サンプルデータセットをロードする方法については、 「Atlas を使い始める」ガイドを参照してください。
コールバックを使用した結果の処理
Observable[T] にサブスクライブした後、Observerクラスが提供する次のコールバックメソッドを使用して、操作結果にアクセスしたり、エラーを処理したりできます。
onNext(result: TResult):Observerが新しい結果を受け取ったときに呼び出されます。 このメソッドをオーバーライドすることで、結果を処理するロジックを定義できます。onError(e: Throwable):操作によってエラーが生成され、ObserverがObservableからそれ以上のデータを受信できないようにされた場合に呼び出されます。 このメソッドをオーバーライドすることで、エラー処理ロジックを定義できます。onComplete():ObserverがObservableからの結果をすべて消費したときに呼び出されます。 このメソッドをオーバーライドすることで、最終データ処理を実行できます。
次のセクションでは、これらのメソッドをカスタマイズして、Observable からの読み取りおよび書込み操作の結果を処理する方法を示します。
読み取り操作の結果にアクセスする
読み取り操作によって取得されたデータにアクセスするには、操作結果を保存するための Observable[T] を作成します。 次に、 Observable をサブスクライブし、Observerクラスのコールバックメソッドをオーバーライドして結果を処理します。
この例では、cuisine 値が "Czech" であるドキュメントを restaurantsコレクションでクエリします。結果を検索して処理するために、この例では操作に Observable[Document] を割り当て、次のアクションを実行します。
subscribe()メソッドを呼び出してObservableにサブスクライブし、Observerをパラメータとして渡しますonNext()メソッドをオーバーライドして、検索された各ドキュメント(Documentインスタンスである)を出力しますエラーを出力するために
onError()メソッドをオーバーライドしますObservableからのすべての結果が処理された後にメッセージを出力するようにonComplete()メソッドをオーバーライドします
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
書込み (write) 操作の結果にアクセスする
書込み操作によって検索されたデータにアクセスするには、操作結果を保存するための Observable[T] を作成します。 次に、 Observable をサブスクライブし、Observerクラスのコールバックメソッドをオーバーライドして結果を処理します。
この例では、 cuisine 値が "Nepalese" であるドキュメントを restaurantsコレクションに挿入します。 結果を検索して処理するために、この例では操作に Observable[InsertManyResult] を割り当て、次のアクションを実行します。
subscribe()メソッドを呼び出してObservableにサブスクライブし、Observerをパラメータとして渡しますonNext()メソッドをオーバーライドして、挿入操作の結果をInsertManyResultとして返しますエラーを出力するために
onError()メソッドをオーバーライドしますObservableからのすべての結果が処理された後にメッセージを出力するようにonComplete()メソッドをオーバーライドします
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Lambda関数を使用した結果の処理
Observerクラスのコールバック関数を明示的にオーバーライドする代わりに、Lambda 関数を使用して操作結果を簡潔に処理できます。 これらの関数を使用すると、=> 矢印表記を使用して、onNext()、onError()、onComplete() の実装をカスタマイズできます。
例
この例では、 restaurantsコレクションを boroughフィールドの個別の値ごとにクエリします。 このコードは distinct() メソッドによって返された Observable をサブスクライブし、Lambda 関数を使用して結果を出力し、エラーを処理します。
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
すべての結果を検索するための将来使用
toFuture() メソッドを呼び出すと、Observable を暗黙的にサブスクライブし、その結果を集計できます。 Observable で toFuture() を呼び出すと、ドライバーは次のアクションを実行します。
サブスクライブ
ObservableObservableが発行したアイテムを収集し、Futureインスタンスに保存します
次に、Future を反復処理して操作結果を検索できます。
重要
例
この例では、 nameフィールドの値が "The Halal Guys" であるドキュメントを restaurantsコレクションでクエリします。 操作結果にアクセスするために、コードは Observable を Future に変換し、Future が各結果を収集するまで待機して、結果を反復処理します。
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。