Overview
En esta guía, puedes aprender cómo acceder a los resultados de las operaciones de MongoDB desde un Observable instancia.
Un Observable representa un flujo de datos emitido por una operación a lo largo del tiempo. Para acceder a estos datos, puedes crear una instancia Observer que se suscriba a Observable.
Nota
El controlador Scala se basa en el controlador Java Reactive Streams de MongoDB. La clase Observable extiende la clase Publisher de Java Reactive Streams e incluye métodos adicionales para facilitar el procesamiento de los resultados.
Cómo procesar un Observable
Para ejecutar una operación de MongoDB y procesar sus datos, debes solicitar los resultados de la operación desde un Observable. El controlador ofrece la interfaz Observable para operaciones que devuelven cualquier número de resultados. Las operaciones que no producen resultados o que producen un solo resultado, como el método findOne(), devuelven un SingleObservable[T]. La parametrización [T] corresponde al tipo de dato que gestiona el SingleObservable.
Las operaciones que pueden producir un número ilimitado de resultados devuelven una instancia del tipo Observable[T]. Algunas operaciones devuelven tipos Observable específicos que proporcionan métodos adicionales para filtrar y procesar los resultados antes de suscribirse a ellos. La siguiente lista describe algunos tipos que permiten encadenar métodos específicos de la operación al tipo Observable:
FindObservable[T]: Devuelto por el métodofind()DistinctObservable[T]: Devuelto por el métododistinct()AggregateObservable[T]: Devuelto por el métodoaggregate()
Puedes solicitar los resultados de la operación llamando al método subscribe() sobre el/la Observable de la operación. Pasa una instancia de la clase Observer como parámetro al método subscribe(). Este Observer recibe los resultados de la operación del Observable. Luego, se pueden utilizar los métodos proporcionados por la clase Observer para imprimir resultados, gestionar errores y realizar procesamiento de datos adicional.
Para obtener más información sobre el procesamiento de los resultados, consulte la siguiente documentación de la API:
Tip
Tiempo de espera observable
Puedes establecer un tiempo de espera en tu Observable para que devuelva los resultados de la query. Para obtener más información, consulta el Observables sección de la guía Límite para Tiempo de Ejecución del Servidor.
Datos de muestra
Los ejemplos de esta guía utilizan la colección restaurants en la base de datos sample_restaurants de los conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde tu aplicación Scala, crea un MongoClient que se conecte a un clúster Atlas y asigna los siguientes valores a tus variables database y collection:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de ejemplo, consulta la guía MongoDB Get Started.
Utiliza callbacks para procesar resultados
Después de suscribirse a un Observable[T], puede utilizar los siguientes métodos de devolución de llamada proporcionados por la clase Observer para acceder a los resultados de la operación o gestionar errores:
onNext(result: TResult): Se llama cuando elObserverrecibe resultados nuevos. Se puede definir la lógica para el procesamiento de resultados sobrescribiendo este método.onError(e: Throwable)Se llama cuando la operación genera un error e impide queObserverreciba más datos deObservable. Puede definir la lógica de gestión de errores anulando este método.onComplete()Se llama cuando elObserverha consumido todos los resultados delObservable. Puedes realizar cualquier procesamiento final de datos anulando este método.
Las siguientes secciones muestran cómo personalizar estos métodos para procesar los resultados de operaciones de lectura y guardar desde un Observable.
Acceder a los resultados de la operación de lectura
Para acceder a los datos recuperados por una operación de lectura, crea un Observable[T] para almacenar los resultados de la operación. Después, suscríbete al observable y anule los métodos de función de retorno de la clase Observer para procesar los resultados.
Este ejemplo consulta la colección restaurants en busca de documentos cuyo valor cuisine sea "Czech". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[Document] a la operación y realiza las siguientes acciones:
Llama al método
subscribe()para suscribirse alObservabley pasa unObservercomo parámetroSobrescribe el método
onNext()para imprimir cada documento recuperado, que son instancias deDocumentAnula el método
onError()para imprimir cualquier errorAnula los métodos
onComplete()para imprimir un mensaje después de que se procesen todos los resultados deObservable
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
Acceso a resultados de operaciones de guardado
Para acceder a los datos recuperados mediante una operación de guardar, crea un Observable[T] para almacenar los resultados de la operación. Luego, suscríbete al observable y sobrescribe los métodos de función de retorno de clase Observer para procesar los resultados.
Este ejemplo inserta documentos en la colección restaurants cuyo valor cuisine es "Nepalese". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[InsertManyResult] a la operación y realiza las siguientes acciones:
Llama al método
subscribe()para suscribirse alObservabley pasa unObservercomo parámetroAnula el método
onNext()para imprimir el resultado de la operación de inserción, devuelto como unInsertManyResultAnula el método
onError()para imprimir cualquier errorAnula los métodos
onComplete()para imprimir un mensaje después de que se procesen todos los resultados deObservable
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Utiliza funciones Lambda para procesar los resultados
En lugar de sobreescribir explícitamente las funciones de función de retorno de la clase Observer, puedes usar funciones lambda para procesar concisamente los resultados de la operación. Estas funciones permiten utilizar la notación de flecha => para personalizar la implementación de onNext(), onError() y onComplete().
Tip
Para aprender más sobre las funciones lambda, también conocidas como funciones anónimas, consulta la Función anónima entrada en Wikipedia.
Ejemplo
Este ejemplo query la colección restaurants para cada valor distinto del campo borough. El código se suscribe al Observable devuelto por el método distinct(), luego utiliza funciones lambda para imprimir resultados y gestionar errores:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
Utiliza futuros para recuperar todos los resultados
Puedes suscribirte a un Observable de forma implícita y agregar sus resultados llamando al método toFuture(). Cuando llames a toFuture() en un Observable, el driver realiza las siguientes acciones:
Suscríbete a la
ObservableRecopila los elementos emitidos por
Observabley los almacena en una instanciaFuture
Luego, puedes iterar a través del Future para recuperar los resultados de la operación.
Importante
Si su Observable contiene una gran cantidad de documentos, llamar al toFuture() método consumirá una cantidad considerable de memoria. Si espera un conjunto de resultados grande, considere usar funciones de devolución de llamada o lambda para acceder a los resultados.
Ejemplo
Este ejemplo consulta la restaurants colección para documentos en los cuales el valor del campo name es "The Halal Guys". Para acceder a los resultados de la operación, el código convierte el Observable en un Future, espera a que el Future recoja cada resultado y recorre los resultados:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: