O driver Scala é um driver assíncrono e não bloqueante. Ao implementar o modelo Observable
, eventos assíncronos se tornam operações simples e compostas que estão livres da complexidade das chamadas de resposta aninhadas.
Para operações assíncronas, existem três interfaces:
Observação
O driver é baseado no driver MongoDB Reactive Streams e é uma implementação da especificação de reactive streams. Observable
é uma implementação de Publisher
e Observer
é uma implementação de Subscriber
.
Aplicam-se as seguintes convenções de nomenclatura de classe:
Observable
: implementação personalizada de umPublisher
Observer
: implementação personalizada de umSubscriber
Subscription
Observável
O Observable
é uma implementação do Publisher
estendida e, em geral, representa uma operação MongoDB que emite seus resultados para o Observer
baseado em uma solicitação do Subscription
para o Observable
.
Importante
Observable
pode ser considerado uma função parcial. Como nas funções parciais, nada acontece até que elas sejam chamadas. Um Observable
pode ser inscrito várias vezes, e cada inscrição pode causar novos efeitos colaterais, como fazer query no MongoDB ou inserir dados.
Observável única
The SingleObservable trait is a Publisher
implementation that returns only a single item. It can be used in the same way as an ordinary Observable
.
inscrição
Um Subscription
representa um ciclo de vida individual de um Observer
assinando um Observable
. Um Subscription
a um Observable
só pode ser usado por um único Observer
. O objetivo de um Subscription
é controlar a demanda e permitir o cancelamento da assinatura do Observable
.
Observer
Um Observer
fornece o mecanismo para receber notificações baseadas em push do Observable
. A demanda por esses eventos é sinalizada por seu Subscription
.
Após a inscrição de um Observable[TResult]
, o Observer
será passado para o Subscription
embora o método onSubscribe(subscription:
Subscription)
. A demanda por resultados é sinalizada por meio do Subscription
e quaisquer resultados são passados para o método onNext(result:
TResult)
. Se houver um erro por qualquer motivo, o método onError(e:
Throwable)
será chamado e mais nenhum evento será passado para Observer
. Como alternativa, quando Observer
tiver consumido todos os resultados de Observable
, o método onComplete()
será chamado.
Contrapressão
No exemplo a seguir, o Subscription
é usado para controlar a demanda ao iterar um Observable
. A implementação padrão do Observer
solicita automaticamente todos os dados. Abaixo, substituímos o método personalizado onSubscribe()
para que possamos gerenciar a iteração orientada pela demanda do Observable
:
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
Auxiliares observáveis
O pacote org.mongodb.scala
fornece interação aprimorada com tipos Publisher
. A funcionalidade estendida inclui assinatura simples por meio de funções anônimas:
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
O pacote org.mongodb.scala
inclui uma classe implícita que também fornece os seguintes operadores monádicas para simplificar o encadeamento e o trabalho com instâncias Publisher
ou Observable
:
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
A seguinte lista descreve os operadores Monádica disponíveis:
andThen
: permite a cadeia de instâncias doObservable
.collect
: coleta todos os resultados em uma sequência.fallbackTo
: permite voltar para umaObservable
alternativa se houver uma falha.filter
: filtra os resultados deObservable
.flatMap
: Cria um novoObservable
aplicando uma função a cada resultado doObservable
.foldLeft
: cria um novoObservable
que contém o único resultado da função de acumulador aplicada.foreach
: Aplica uma função aplicada a cada resultado emitido.head
: retorna a head doObservable
em umFuture
.map
: Cria um novoObservable
aplicando uma função para cada resultado emitido doObservable
.observeOn
: cria um novoObservable
que utiliza umExecutionContext
específico para operações futuras.recover
: cria um novoObservable
que lidará com qualquer lançamento correspondente que esseObservable
possa conter, atribuindo a ele um valor de outroObservable
.recoverWith
: Cria um novoObservable
que lidará com qualquer lançamento correspondente que esseObservable
possa conter.toFuture
: coleta os resultados deObservable
e os converte emFuture
.transform
: cria um novoObservable
aplicando a funçãoresultFunction
a cada resultado emitido.withFilter
: fornece suporte para compreensão de instâncias doObservable
.zip
: compacta os valores deste e de outroObservable
e cria um novoObservable
contendo a tupla de seus resultados.
See the BoxedPublisher API documentation to learn more about each operator.
Observável única
Como SingleObservable[T]
retorna um único item, o método toFuture()
retorna Future[T]
da mesma forma que o método head. Há também um conversor implícito que converte um Publisher
em um SingleObservable
.