Overview
En esta guía, puede aprender cómo acceder a los resultados de las operaciones de MongoDB desde un Observable instancia.
Un Observable representa un flujo de datos emitido por una operación a lo largo del tiempo. Para acceder a estos datos, puede crear una instancia Observer que se suscriba a Observable.
Nota
El controlador Scala se basa en el controlador Java Reactive Streams de MongoDB. La clase Observable extiende la clase Publisher de Java Reactive Streams e incluye métodos adicionales para facilitar el procesamiento de los resultados.
Cómo procesar un observable
Para ejecutar una operación de MongoDB y procesar sus datos, debe solicitar los resultados de la operación a un Observable. El controlador proporciona la interfaz Observable para operaciones que devuelven cualquier número de resultados. Las operaciones que no producen ningún resultado o que solo producen uno, como el método findOne(), devuelven un SingleObservable[T]. La parametrización [T] corresponde al tipo de datos que maneja el SingleObservable.
Las operaciones que pueden producir un número ilimitado de resultados devuelven una instancia del tipo Observable[T]. Algunas operaciones devuelven tipos Observable específicos que proporcionan métodos adicionales para filtrar y procesar los resultados antes de suscribirse a ellos. La siguiente lista describe algunos tipos que permiten encadenar métodos específicos de la operación al tipo Observable:
FindObservable[T]: Devuelto por el métodofind()DistinctObservable[T]: Devuelto por el métododistinct()AggregateObservable[T]: Devuelto por el métodoaggregate()
Puede solicitar los resultados de una operación llamando al método subscribe() en el Observable de la operación. Pase una instancia de la clase Observer como parámetro al método subscribe(). Este Observer recibe los resultados de la operación del Observable. Después, puede usar los métodos proporcionados por la clase Observer para imprimir los resultados, gestionar errores y realizar procesamiento adicional de datos.
Para obtener más información sobre el procesamiento de los resultados, consulte la siguiente documentación de la API:
Tip
Tiempo de espera observable
Puede establecer un tiempo de espera en su Observable para devolver los resultados de la consulta. Para obtener más información, consulte SecciónObservables de la guía Limitar el tiempo de ejecución del servidor.
Datos de muestra
Los ejemplos de esta guía utilizan la restaurants colección sample_restaurants de la base de datos de los conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde su aplicación Scala, cree un MongoClient que se conecte a un clúster de Atlas y asigne los siguientes valores a database las collection variables y:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte la guía de introducción a MongoDB.
Utilice devoluciones de llamadas para procesar resultados
Después de suscribirse a un Observable[T], puede utilizar los siguientes métodos de devolución de llamada proporcionados por la clase Observer para acceder a los resultados de la operación o manejar errores:
onNext(result: TResult)Se llama cuandoObserverrecibe nuevos resultados. Puede definir la lógica para procesar los resultados anulando este método.onError(e: Throwable)Se llama cuando la operación genera un error e impide queObserverreciba más datos deObservable. Puede definir la lógica de gestión de errores anulando este método.onComplete()Se llama cuandoObserverha consumido todos los resultados deObservable. Puede realizar cualquier procesamiento final de datos anulando este método.
Las siguientes secciones muestran cómo personalizar estos métodos para procesar los resultados de las operaciones de lectura y escritura de un Observable.
Acceder a los resultados de la operación de lectura
Para acceder a los datos recuperados por una operación de lectura, cree un Observable[T] para almacenar los resultados. Luego, suscríbase al observable y sobrescriba los métodos de devolución de llamada de la clase Observer para procesar los resultados.
Este ejemplo consulta la colección restaurants en busca de documentos cuyo valor cuisine sea "Czech". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[Document] a la operación y realiza las siguientes acciones:
Llama al método
subscribe()para suscribirse alObservabley pasa unObservercomo parámetroAnula el método
onNext()para imprimir cada documento recuperado, que son instanciasDocumentAnula el método
onError()para imprimir cualquier errorAnula los métodos
onComplete()para imprimir un mensaje después de que se procesen todos los resultados deObservable
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
Acceder a los resultados de la operación de escritura
Para acceder a los datos recuperados por una operación de escritura, cree un Observable[T] para almacenar los resultados. Luego, suscríbase al observable y sobrescriba los métodos de devolución de llamada de la clase Observer para procesar los resultados.
Este ejemplo inserta documentos en la colección restaurants cuyo valor cuisine es "Nepalese". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[InsertManyResult] a la operación y realiza las siguientes acciones:
Llama al método
subscribe()para suscribirse alObservabley pasa unObservercomo parámetroAnula el método
onNext()para imprimir el resultado de la operación de inserción, devuelto como unInsertManyResultAnula el método
onError()para imprimir cualquier errorAnula los métodos
onComplete()para imprimir un mensaje después de que se procesen todos los resultados deObservable
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Utilice funciones Lambda para procesar resultados
En lugar de sobrescribir explícitamente las funciones de devolución de llamada de la clase Observer, puede usar funciones lambda para procesar los resultados de las operaciones de forma concisa. Estas funciones permiten usar la notación de flecha => para personalizar la implementación de onNext(), onError() y onComplete().
Tip
Para aprender más sobre las funciones lambda, también conocidas como funciones anónimas, consulta la Función anónima entrada en Wikipedia.
Ejemplo
Este ejemplo consulta la colección restaurants para cada valor distinto del campo borough. El código se suscribe al Observable devuelto por el método distinct() y luego utiliza funciones lambda para imprimir los resultados y gestionar los errores:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
Utilice futuros para recuperar todos los resultados
Puedes suscribirte a un Observable implícitamente y agregar sus resultados llamando al método toFuture(). Al llamar a toFuture() en un Observable, el controlador realiza las siguientes acciones:
Suscríbete a la
ObservableRecopila los elementos emitidos por
Observabley los almacena en una instanciaFuture
Luego, puede iterar a través de Future para recuperar los resultados de la operación.
Importante
Si su Observable contiene una gran cantidad de documentos, llamar al toFuture() método consumirá una cantidad considerable de memoria. Si espera un conjunto de resultados grande, considere usar funciones de devolución de llamada o lambda para acceder a los resultados.
Ejemplo
Este ejemplo consulta la colección restaurants en busca de documentos cuyo valor del campo name sea "The Halal Guys". Para acceder a los resultados de la operación, el código convierte Observable en Future, espera a que Future recopile cada resultado y los itera:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: