Docs 菜单
Docs 主页
/ /
开始体验

入门:Reactive Streams 和 Observables

本指南提供了有关 Scala 驱动程序及其异步 API 的背景,然后向您展示如何在 快速入门指南中使用驱动程序和 MongoDB。

注意

有关如何安装驱动程序的说明,请参阅安装指南

Scala驱动程序基于MongoDB Java Reactive Streams驱动程序构建。响应式流API由以下组件组成:

  1. Observable发布者 的自定义实施

  2. Observer:订阅服务器的自定义实施

  3. 订阅

Observable是数量可能无限的有序元素的提供商,根据从其ObserverObserver的多个实例收到的需求进行发布。

为响应对Observable.subscribe(Observer)的调用, Observer类上的方法的可能调用序列由以下协议给出:

onSubscribe onNext* (onError | onComplete)?

这意味着onSubscribe()始终会发出信号,然后按照Observer的请求,可能有无限数量的onNext()信号。 只要Subscription未取消,如果出现故障,则后跟一个onError()信号,或者当没有更多元素可用时,后跟一个onComplete()信号。

提示

要学习;了解有关响应式流的更多信息,请访问响应式流文档。

Scala 驱动程序 API 镜像Java Sync 驱动程序API 以及导致网络 I/O 返回Observable<T>类型的任何方法,其中T是操作的响应类型。

注意

Observable从API返回的所有 类型都是冷类型,这意味着在订阅它们之前不会发生任何事情。因此,仅创建 Observable 不会导致任何网络 I/O。直到您调用 Subscription.request() 方法,驱动程序才会执行操作。

此实施中的发布者是单播的。 Observable的每个Subscription都与单个MongoDB操作相关,并且Observable实例的Observer会接收自己的特定结果设立。

默认,订阅Observable后, Observer特征将立即从Observer请求所有结果。 确保Observer可以处理来自Observable的所有结果。 Observer.onSubscribe()方法的自定义实现可以保存Subscription ,以便仅在Observer具有容量时才请求数据。

在本快速入门中,我们实现了驱动程序源代码 GitHub 存储库的 Helpers.scala 文件中定义的自定义隐式辅助工具。这些助手检索并打印结果。尽管快速入门是异步代码的人为场景,但这些示例会在开始下一个示例之前区块一个示例的结果,以确保数据库的省/市/自治区。Helpers对象提供以下方法:

  • results():阻塞,直到Observable完成并返回收集的结果

  • headResult():阻塞,直到可以返回Observable的第一个结果

  • printResults():阻塞直到Observable完成,并打印每个结果

  • printHeadResult():阻塞,直到Observable的第一个结果可用,然后打印

后退

开始体验

在此页面上