Scala 드라이버는 비동기 및 비차단 드라이버입니다. Observable 모델을 구현하면 비동기 이벤트가 중첩된 콜백의 복잡성에서 벗어나 간단하고 구성 가능한 작업이 됩니다.
비동기 작업의 경우 세 가지 인터페이스가 있습니다.
참고
이 드라이버는 MongoDB Reactive Streams 드라이버 를 기반으로 구축되었으며, Reactive Streams 사양을 구현한 것입니다. Observable 은 Publisher 의 구현이고 Observer 는 Subscriber 의 구현입니다.
다음과 같은 클래스 명명 규칙이 적용됩니다.
Observable: 사용자 지정 구현PublisherObserver: 사용자 지정 구현SubscriberSubscription
관찰 가능
Observable 은(는) 확장된 Publisher 구현이며, 일반적으로 Subscription 에서 Observable 로의 요청에 따라 결과를 Observer 로 내보내는 MongoDB 작업을 나타냅니다.
중요
Observable 부분 함수로 생각할 수 있습니다. 부분 함수와 마찬가지로 호출될 때까지 아무 일도 발생하지 않습니다. Observable 는 여러 번 구독할 수 있으며, 각 구독은 MongoDB를 쿼리하거나 데이터를 삽입하는 등의 새로운 부작용을 일으킬 수 있습니다.
SingleObservable
SingleObservable 특성은 단일 항목만 반환하는 Publisher 구현 입니다. 일반 Observable와 동일한 방식으로 사용할 수 있습니다.
서브스크립션
Subscription 는 Observable 를 구독하는 Observer 의 일대일 라이프사이클을 나타냅니다. Subscription ~ Observable 는 단일 Observer 에서만 사용할 수 있습니다. Subscription 의 목적은 수요를 제어하고 Observable 의 구독 취소를 허용하는 것입니다.
관찰자
Observer 는 Observable 로부터 푸시 기반 알림을 수신하기 위한 메커니즘을 제공합니다. 이러한 이벤트에 대한 수요는 Subscription 로 표시됩니다.
Observable[TResult] 을(를) 구독하면 Observer 이(가) onSubscribe(subscription:
Subscription) 메서드를 통해 Subscription 으)로 전달됩니다. 결과에 대한 수요는 Subscription 를 통해 신호를 보내고 모든 결과는 onNext(result:
TResult) 메서드로 전달됩니다. 어떤 이유로든 오류가 발생하면 onError(e:
Throwable) 메서드가 호출되고 Observer 에 더 이상 이벤트가 전달되지 않습니다. 또는 Observer 가 Observable 의 모든 결과를 소비하면 onComplete() 메서드가 호출됩니다.
배압
다음 예제에서는 Subscription 을(를) 사용하여 Observable 을(를) 반복할 때 수요를 제어합니다. 기본 Observer 구현은 모든 데이터를 자동으로 요청합니다. 아래에서는 Observable 의 수요 기반 반복을 관리할 수 있도록 onSubscribe() 메서드 사용자 지정을 재정의합니다.
collection.find().subscribe(new Observer[Document](){   var batchSize: Long = 10   var seen: Long = 0   var subscription: Option[Subscription] = None   override def onSubscribe(subscription: Subscription): Unit = {     this.subscription = Some(subscription)     subscription.request(batchSize)   }   override def onNext(result: Document): Unit = {     println(document.toJson())     seen += 1     if (seen == batchSize) {       seen = 0       subscription.get.request(batchSize)     }   }   override def onError(e: Throwable): Unit = println(s"Error: $e")   override def onComplete(): Unit = println("Completed") }) 
관찰 가능한 헬퍼
The org.mongodb.scala package provides improved interaction with Publisher types. 확장 기능에는 익명 함수를 통한 간단한 구독 이 포함됩니다.
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()),                             (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()),                             (e: Throwable) => println(s"There was an error: $e"),                             () => println("Completed!")) 
org.mongodb.scala 패키지에는 Publisher 또는 Observable 인스턴스를 더 간단하게 연결하고 작업할 수 있도록 다음과 같은 모나딕 연산자도 제공하는 암시적 클래스가 포함되어 있습니다.
GenerateHtmlObservable().andThen({   case Success(html: String) => renderHtml(html)   case Failure(t) => renderHttp500 }) 
다음 목록에서는 사용 가능한 모나딕 연산자에 대해 설명합니다.
andThen:Observable인스턴스의 체인을 허용합니다.collect: 모든 결과를 시퀀스로 수집합니다.fallbackTo: 오류가 있는 경우 대체Observable로 대체할 수 있습니다.filter:Observable의 결과를 필터링합니다.flatMap:Observable의 각 결과에 함수를 적용하여 새Observable을 생성합니다.foldLeft: 적용된 축적자 함수의 단일 결과를 포함하는 새Observable를 만듭니다.foreach: 방출된 각 결과에 적용된 함수를 적용합니다.head:Future에 있는Observable의 헤드를 반환합니다.map:Observable의 방출된 각 결과에 함수를 적용하여 새Observable을 생성합니다.observeOn: 향후 작업을 위해 특정ExecutionContext를 사용하는 새Observable를 만듭니다.recover: 이Observable에 다른Observable값을 할당하여 포함할 수 있는 일치하는 모든 스로어블을 처리할 새Observable를 만듭니다.recoverWith: 이Observable에 포함될 수 있는 일치하는 모든 스로어블을 처리하다 할 새Observable를 만듭니다.toFuture:Observable결과를 수집하여 로Future변환합니다.transform: 방출된 각 결과에resultFunction함수를 적용하여 새Observable를 만듭니다.withFilter:Observable인스턴스에 대한 for-comprehensions 지원을 제공합니다.zip: 이 값과 다른Observable값을 압축하고 결과의 튜플을 포함하는 새Observable를 만듭니다.
각 연산자 에 대한 자세한 학습 은 BoxedPublisher API 설명서를 참조하세요.
SingleObservable
SingleObservable[T] 는 단일 항목을 반환하므로 toFuture() 메서드는 헤드 메서드와 동일한 방식으로 Future[T] 를 반환합니다. Publisher 을 SingleObservable 로 변환하는 암시적 변환기도 있습니다.