Overview
In this guide, you can learn how to access the results of MongoDB operations
from an Observable instance.
An Observable represents a stream of data emitted by an operation
over time. To access this data, you can create an Observer instance
that subscribes to the Observable.
Note
The Scala driver is built upon the MongoDB Java Reactive Streams driver.
The Observable class extends the Publisher class from Java Reactive
Streams and includes additional convenience methods to help process results.
How to Process an Observable
To run a MongoDB operation and process its data, you must request
the operation results from an Observable. The driver provides the
Observable interface for operations that return any number of
results. Operations that produce no results or one result, such as the
findOne() method, return a SingleObservable[T]. The [T]
parameterization corresponds to the data type that the SingleObservable
handles.
Operations that can produce an unbounded number of results return an instance of the
Observable[T] type. Some operations return specific Observable types that
provide additional methods to filter and process results before subscribing to them.
The following list describes some types that allow you to chain operation-specific
methods to the Observable:
FindObservable[T]: Returned by thefind()methodDistinctObservable[T]: Returned by thedistinct()methodAggregateObservable[T]: Returned by theaggregate()method
You can request operation results by calling the subscribe() method
on the operation's Observable. Pass an instance of the Observer
class as a parameter to the subscribe() method. This Observer receives
the operation results from the Observable. Then, you can use methods
provided by the Observer class to print results, handle errors, and
perform additional data processing.
To learn more about processing results, see the following API documentation:
Tip
Observable Timeout
You can set a timeout on your Observable to return query results.
To learn more, see the Observables section of the Limit
Server Execution Time guide.
Sample Data
The examples in this guide use the restaurants collection in the sample_restaurants
database from the Atlas sample datasets. To access this collection
from your Scala application, create a MongoClient that connects to an Atlas cluster
and assign the following values to your database and collection variables:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.
Use Callbacks to Process Results
After subscribing to an Observable[T], you can use the following
callback methods provided by the Observer class to access operation results
or handle errors:
onNext(result: TResult): Called when theObserverreceives new results. You can define logic for processing results by overriding this method.onError(e: Throwable): Called when the operation generates an error and prevents theObserverfrom receiving more data from theObservable. You can define error handling logic by overriding this method.onComplete(): Called when theObserverhas consumed all results from theObservable. You can perform any final data processing by overriding this method.
The following sections show how to customize these methods to process read and
write operation results from an Observable.
Access Read Operation Results
To access data retrieved by a read operation, create an Observable[T]
to store the operation results. Then, subscribe to the observable and
override the Observer class callback methods to process the results.
This example queries the restaurants collection for documents
in which the cuisine value is "Czech". To retrieve
and process results, the example assigns a Observable[Document]
to the operation and performs the following actions:
Calls the
subscribe()method to subscribe to theObservableand passes anObserveras a parameterOverrides the
onNext()method to print each retrieved document, which areDocumentinstancesOverrides the
onError()method to print any errorsOverrides the
onComplete()methods to print a message after all the results from theObservableare processed
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
Access Write Operation Results
To access data retrieved by a write operation, create an Observable[T]
to store the operation results. Then, subscribe to the observable and
override the Observer class callback methods to process the results.
This example inserts documents into the restaurants collection
in which the cuisine value is "Nepalese". To retrieve
and process results, the example assigns an Observable[InsertManyResult]
to the operation and performs the following actions:
Calls the
subscribe()method to subscribe to theObservableand passes anObserveras a parameterOverrides the
onNext()method to print the result of the insert operation, returned as anInsertManyResultOverrides the
onError()method to print any errorsOverrides the
onComplete()methods to print a message after all the results from theObservableare processed
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Use Lambda Functions to Process Results
Instead of explicitly overriding the callback functions from the
Observer class, you can use lambda functions to concisely
process operation results. These functions allow you to use the
=> arrow notation to customize the implementation of onNext(),
onError(), and onComplete().
Tip
To learn more about lambda functions, also known as anonymous functions, see the Anonymous function Wikipedia entry.
Example
This example queries the restaurants collection for each
distinct value of the borough field. The code subscribes
to the Observable returned by the distinct() method, then
uses lambda functions to print results and handle errors:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
Use Futures to Retrieve All Results
You can subscribe to an Observable implicitly and aggregate its results
by calling the toFuture() method. When you call toFuture() on
an Observable, the driver performs the following actions:
Subscribes to the
ObservableCollects the items emitted by the
Observableand stores them in aFutureinstance
Then, you can iterate through the Future to retrieve the operation
results.
Important
Example
This example queries the restaurants collection for documents
in which the value of the name field is "The Halal Guys".
To access the operation results, the code converts the Observable
to a Future, waits until the Future collects each result, and
iterates through the results:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: