Overview
在本指南中,您可以学习;了解如何从 Observable实例访问权限MongoDB操作的结果。
Observable 表示操作随时间推移发出的数据流。 要访问权限此数据,您可以创建一个订阅 Observable 的 Observer实例。
注意
Scala驾驶员基于MongoDB Java Reactive Streams驾驶员构建。 Observable 类扩展了Java Reactive Streams 中的 Publisher 类,并包含其他有助于进程结果的便捷方法。
如何处理 Observable
要运行MongoDB操作并进程其数据,您必须从 Observable请求操作结果。驾驶员为返回任意数量结果的操作提供了 Observable 接口。不产生结果或只产生一个结果的操作(例如 findOne() 方法)会返回 SingleObservable[T]。[T] 参数化对应于 SingleObservable 处理的数据类型。
可以生成无限数量结果的操作会返回 Observable[T] 类型的实例。 某些操作会返回特定的 Observable 类型,这些类型提供了在订阅结果之前过滤和进程结果的其他方法。 以下列表描述了一些允许您将特定于操作的方法链接到 Observable 的类型:
FindObservable[T]:由find()方法返回DistinctObservable[T]:由distinct()方法返回AggregateObservable[T]:由aggregate()方法返回
您可以通过对操作的 Observable 调用 subscribe() 方法来请求操作结果。将 Observer 类的实例作为参数传递给 subscribe() 方法。此 Observer 从 Observable 接收操作结果。然后,您可以使用 Observer 类提供的方法打印结果、处理错误并执行其他数据处理。
要学习;了解有关处理结果的更多信息,请参阅以下API文档:
样本数据
restaurantssample_restaurants本指南中的示例使用Atlas示例数据集的 数据库中的 集合。要从Scala应用程序访问权限此集合,请创建一个连接到Atlas 集群的MongoClient,并将以下值分配给 database 和 collection 变量:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅Atlas入门指南。
使用回调处理结果
订阅Observable[T] 后,您可以使用 Observer 类提供的以下回调方法来访问权限操作结果或处理错误:
onNext(result: TResult):当Observer收到新结果时调用。 您可以通过重写此方法来定义处理结果的逻辑。onError(e: Throwable):当操作生成错误并阻止Observer从Observable接收更多数据时调用。 您可以通过重写此方法来定义错误处理逻辑。onComplete():当Observer消耗完来自Observable的所有结果时调用。 您可以通过重写此方法来执行任何最终数据处理。
以下部分介绍如何自定义这些方法以进程来自 Observable 的读取和写入操作结果。
访问读取操作结果
要访问权限读取操作检索的数据,请创建 Observable[T] 来存储操作结果。 然后,订阅该可观察对象并重写 Observer 类回调方法以进程结果。
此示例查询 restaurants集合中 cuisine 值为 "Czech" 的文档。为了检索和进程结果,该示例为操作分配了 Observable[Document],并执行以下操作:
调用
subscribe()方法订阅Observable并将Observer作为参数传递覆盖
onNext()方法以打印每个检索到的文档,这些文档是Document实例覆盖
onError()方法以打印任何错误覆盖
onComplete()方法,以在处理Observable的所有结果后打印消息
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
访问写入操作结果
要访问权限写入操作检索的数据,请创建 Observable[T] 来存储操作结果。 然后,订阅该可观察对象并重写 Observer 类回调方法以进程结果。
此示例将文档插入到 cuisine 值为 "Nepalese" 的 restaurants集合中。 为了检索和进程结果,该示例为操作分配了一个 Observable[InsertManyResult] 并执行以下操作:
调用
subscribe()方法订阅Observable并将Observer作为参数传递覆盖
onNext()方法以打印插入操作的结果,以InsertManyResult形式返回覆盖
onError()方法以打印任何错误覆盖
onComplete()方法,以在处理Observable的所有结果后打印消息
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
使用Lambda函数处理结果
您可以使用Lambda函数来简洁地进程操作结果,而无需显式覆盖 Observer 类中的回调函数。 这些函数允许您使用 => 箭头表示法自定义 onNext()、onError() 和 onComplete() 的实施。
提示
要学习;了解有关Lambda函数(也称为匿名函数)的更多信息,请参阅匿名函数维基百科条目。
例子
此示例在 restaurants集合中查询 borough字段的每个不同值。 该代码订阅 distinct() 方法返回的 Observable,然后使用Lambda函数打印结果并处理错误:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
使用Futures 检索所有结果
您可以隐式订阅Observable,并通过调用 toFuture() 方法聚合其结果。 当您对 Observable 调用 toFuture() 时,驾驶员会执行以下操作:
订阅
Observable收集
Observable发出的项并将其存储在Future实例中
然后,您可以遍历 Future 以检索操作结果。
例子
此示例查询 restaurants集合中 name字段值为 "The Halal Guys" 的文档。 为了访问权限操作结果,代码将 Observable 转换为 Future,等待 Future 收集每个结果,然后遍历这些结果:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: