O driver Scala é um driver assíncrono e não bloqueante. Ao implementar o modelo Observable , eventos assíncronos se tornam operações simples e compostas que estão livres da complexidade das chamadas de resposta aninhadas.
Para operações assíncronas, existem três interfaces:
Observação
O driver é baseado no driver MongoDB Reactive Streams e é uma implementação da especificação de reactive streams. Observable é uma implementação de Publisher e Observer é uma implementação de Subscriber.
Aplicam-se as seguintes convenções de nomenclatura de classe:
Observable: implementação personalizada de umPublisherObserver: implementação personalizada de umSubscriberSubscription
Observável
O Observable é uma implementação do Publisher estendida e, em geral, representa uma operação MongoDB que emite seus resultados para o Observer baseado em uma solicitação do Subscription para o Observable.
Importante
Observable pode ser considerado uma função parcial. Como nas funções parciais, nada acontece até que elas sejam chamadas. Um Observable pode ser inscrito várias vezes, e cada inscrição pode causar novos efeitos colaterais, como fazer query no MongoDB ou inserir dados.
Observável única
O traço SingleObservable é uma implementação do Publisher que retorna somente um único item. Ele pode ser usado da mesma forma que um Observable comum.
inscrição
Um Subscription representa um ciclo de vida individual de um Observer assinando um Observable. Um Subscription a um Observable só pode ser usado por um único Observer. O objetivo de um Subscription é controlar a demanda e permitir o cancelamento da assinatura do Observable.
Observer
Um Observer fornece o mecanismo para receber notificações baseadas em push do Observable. A demanda por esses eventos é sinalizada por seu Subscription.
Após a inscrição de um Observable[TResult], o Observer será passado para o Subscription embora o método onSubscribe(subscription:
Subscription) . A demanda por resultados é sinalizada por meio do Subscription e quaisquer resultados são passados para o método onNext(result:
TResult) . Se houver um erro por qualquer motivo, o método onError(e:
Throwable) será chamado e mais nenhum evento será passado para Observer. Como alternativa, quando Observer tiver consumido todos os resultados de Observable, o método onComplete() será chamado.
Contrapressão
No exemplo a seguir, o Subscription é usado para controlar a demanda ao iterar um Observable. A implementação padrão do Observer solicita automaticamente todos os dados. Abaixo, substituímos o método personalizado onSubscribe() para que possamos gerenciar a iteração orientada pela demanda do Observable:
collection.find().subscribe(new Observer[Document](){   var batchSize: Long = 10   var seen: Long = 0   var subscription: Option[Subscription] = None   override def onSubscribe(subscription: Subscription): Unit = {     this.subscription = Some(subscription)     subscription.request(batchSize)   }   override def onNext(result: Document): Unit = {     println(document.toJson())     seen += 1     if (seen == batchSize) {       seen = 0       subscription.get.request(batchSize)     }   }   override def onError(e: Throwable): Unit = println(s"Error: $e")   override def onComplete(): Unit = println("Completed") }) 
Auxiliares observáveis
O pacote org.mongodb.scala fornece interação aprimorada com tipos Publisher . A funcionalidade estendida inclui assinatura simples por meio de funções anônimas:
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()),                             (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()),                             (e: Throwable) => println(s"There was an error: $e"),                             () => println("Completed!")) 
O pacote org.mongodb.scala inclui uma classe implícita que também fornece os seguintes operadores monádicas para simplificar o encadeamento e o trabalho com instâncias Publisher ou Observable :
GenerateHtmlObservable().andThen({   case Success(html: String) => renderHtml(html)   case Failure(t) => renderHttp500 }) 
A seguinte lista descreve os operadores Monádica disponíveis:
andThen: permite a cadeia de instâncias doObservable.collect: coleta todos os resultados em uma sequência.fallbackTo: permite voltar para umaObservablealternativa se houver uma falha.filter: filtra os resultados deObservable.flatMap: Cria um novoObservableaplicando uma função a cada resultado doObservable.foldLeft: cria um novoObservableque contém o único resultado da função de acumulador aplicada.foreach: Aplica uma função aplicada a cada resultado emitido.head: retorna a head doObservableem umFuture.map: Cria um novoObservableaplicando uma função para cada resultado emitido doObservable.observeOn: cria um novoObservableque utiliza umExecutionContextespecífico para operações futuras.recover: cria um novoObservableque lidará com qualquer lançamento correspondente que esseObservablepossa conter, atribuindo a ele um valor de outroObservable.recoverWith: Cria um novoObservableque lidará com qualquer lançamento correspondente que esseObservablepossa conter.toFuture: coleta os resultados deObservablee os converte emFuture.transform: cria um novoObservableaplicando a funçãoresultFunctiona cada resultado emitido.withFilter: fornece suporte para compreensão de instâncias doObservable.zip: compacta os valores deste e de outroObservablee cria um novoObservablecontendo a tupla de seus resultados.
Consulte a documentação da API do BoxedPublisher para saber mais sobre cada operador.
Observável única
Como SingleObservable[T] retorna um único item, o método toFuture() retorna Future[T] da mesma forma que o método head. Há também um conversor implícito que converte um Publisher em um SingleObservable.