Scala 驱动程序是异步非阻塞驱动程序。 通过实现 Observable模型,异步事件变得简单、可组合的操作,并且摆脱了嵌套回调的复杂性。
对于异步操作,提供了三个接口:
注意
该驱动程序基于MongoDB Reactive Streams 驱动程序构建,是响应式流规范的实现。 Observable是Publisher的实现,而Observer是Subscriber的实现。
以下类命名规则适用:
Observable:自定义实现PublisherObserver:自定义实现SubscriberSubscription
可观察
是扩展的Observable Publisher实现,通常它表示 MongoDB 操作,该操作根据从Observer 到 的请求将其结果发送到Subscription Observable。
重要
Observable 可以视为偏函数。 与偏函数一样,在调用它们之前不会发生任何事情。 Observable可以订阅多次,每次订阅都可能导致新的副作用,例如查询 MongoDB 或插入数据。
SingleObservable
SingleObservable 特征是仅返回单个项目的 Publisher实施。它的使用方式与普通 Observable 相同。
订阅
Subscription表示订阅Observable的Observer的一对一生命周期。 Subscription的Observable 只能由单个Observer 使用。Subscription的用途是控制需求并允许取消订阅Observable 。
观察者
Observer提供了从Observable接收基于推送的通知的机制。 对这些事件的需求由其Subscription 。
订阅Observable[TResult]后,将通过onSubscribe(subscription:
Subscription)方法向Observer传递Subscription 。 对结果的需求通过Subscription ,任何结果都传递给onNext(result:
TResult)方法。 如果由于任何原因出现错误,则会调用onError(e:
Throwable)方法,并且不会再将事件传递到Observer 。 或者,当Observer消耗完来自Observable的所有结果时,将调用onComplete()方法。
背压
在以下示例中, Subscription用于在迭代Observable时控制需求。 默认的Observer实现会自动请求所有数据。 下面我们将重写onSubscribe()自定义方法,以便管理Observable的需求驱动迭代:
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
可观察助手
org.mongodb.scala包改进了与Publisher类型的交互。 扩展功能包括通过匿名函数进行简单订阅:
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
org.mongodb.scala包包含一个隐式类,该类还提供以下 Monadic 操作符,以使Publisher或Observable实例的链接和使用变得更简单:
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
以下列表描述了可用的 Monadic 操作符:
andThen:允许链接Observable实例。collect:将所有结果收集到一个序列中。fallbackTo:允许在出现故障时回退到备用Observable。filter:筛选Observable的结果。flatMap:通过对Observable的每个结果应用一个函数来创建新的Observable。foldLeft:创建一个新的Observable,其中包含应用的累加器函数的单个结果。foreach:应用应用于每个发出结果的函数。head:返回Future中Observable的头部。map:通过将函数应用于Observable的每个发出的结果来创建新的Observable。observeOn:创建一个新的Observable,它将特定的ExecutionContext用于将来的操作。recover:创建一个新的Observable,它将处理此Observable可能包含的任何匹配的可抛出对象,方法是为其分配另一个Observable的值。recoverWith:创建一个新的Observable,它将处理此Observable可能包含的任何匹配的可抛出对象。toFuture:收集Observable结果并将其转换为Future。transform:通过将resultFunction函数应用于每个发出的结果来创建新的Observable。withFilter:为Observable实例提供 for-推导式支持。zip:压缩此Observable和另一个的值,并创建一个新的Observable来保存其结果的元组。
请参阅BoxedPublisher API文档,学习;了解有关每个操作符的更多信息。
SingleObservable
由于SingleObservable[T]会返回单个项目,因此toFuture()方法会以与 head 方法相同的方式返回Future[T] 。 还有一个隐式转换器,可将Publisher转换为SingleObservable 。