Docs Menu
Docs Home
/ /

Acceder a los datos desde un observable

En esta guía, puede aprender cómo acceder a los resultados de las operaciones de MongoDB desde un Observable instancia.

Un Observable representa un flujo de datos emitido por una operación a lo largo del tiempo. Para acceder a estos datos, puede crear una instancia Observer que se suscriba a Observable.

Nota

El controlador Scala se basa en el controlador Java Reactive Streams de MongoDB. La clase Observable extiende la clase Publisher de Java Reactive Streams e incluye métodos adicionales para facilitar el procesamiento de los resultados.

Para ejecutar una operación de MongoDB y procesar sus datos, debe solicitar los resultados de la operación a un Observable. El controlador proporciona la interfaz Observable para operaciones que devuelven cualquier número de resultados. Las operaciones que no producen ningún resultado o que solo producen uno, como el método findOne(), devuelven un SingleObservable[T]. La parametrización [T] corresponde al tipo de datos que maneja el SingleObservable.

Las operaciones que pueden producir un número ilimitado de resultados devuelven una instancia del tipo Observable[T]. Algunas operaciones devuelven tipos Observable específicos que proporcionan métodos adicionales para filtrar y procesar los resultados antes de suscribirse a ellos. La siguiente lista describe algunos tipos que permiten encadenar métodos específicos de la operación al tipo Observable:

  • FindObservable[T]: Devuelto por el método find()

  • DistinctObservable[T]: Devuelto por el método distinct()

  • AggregateObservable[T]: Devuelto por el método aggregate()

Puede solicitar los resultados de una operación llamando al método subscribe() en el Observable ​​de la operación. Pase una instancia de la clase Observer como parámetro al método subscribe(). Este Observer recibe los resultados de la operación del Observable. Después, puede usar los métodos proporcionados por la clase Observer para imprimir los resultados, gestionar errores y realizar procesamiento adicional de datos.

Para obtener más información sobre el procesamiento de los resultados, consulte la siguiente documentación de la API:

  • Observable

  • Suscripción

  • Observador

Tip

Tiempo de espera observable

Puede establecer un tiempo de espera en su Observable para devolver los resultados de la consulta. Para obtener más información, consulte SecciónObservables de la guía Limitar el tiempo de ejecución del servidor.

Los ejemplos de esta guía utilizan la restaurants colección sample_restaurants de la base de datos de los conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde su aplicación Scala, cree un MongoClient que se conecte a un clúster de Atlas y asigne los siguientes valores a database las collection variables y:

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte la guía de introducción a MongoDB.

Después de suscribirse a un Observable[T], puede utilizar los siguientes métodos de devolución de llamada proporcionados por la clase Observer para acceder a los resultados de la operación o manejar errores:

  • onNext(result: TResult)Se llama cuando Observer recibe nuevos resultados. Puede definir la lógica para procesar los resultados anulando este método.

  • onError(e: Throwable)Se llama cuando la operación genera un error e impide que Observer reciba más datos de Observable. Puede definir la lógica de gestión de errores anulando este método.

  • onComplete()Se llama cuando Observer ha consumido todos los resultados de Observable. Puede realizar cualquier procesamiento final de datos anulando este método.

Las siguientes secciones muestran cómo personalizar estos métodos para procesar los resultados de las operaciones de lectura y escritura de un Observable.

Para acceder a los datos recuperados por una operación de lectura, cree un Observable[T] para almacenar los resultados. Luego, suscríbase al observable y sobrescriba los métodos de devolución de llamada de la clase Observer para procesar los resultados.

Este ejemplo consulta la colección restaurants en busca de documentos cuyo valor cuisine sea "Czech". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[Document] a la operación y realiza las siguientes acciones:

  • Llama al método subscribe() para suscribirse al Observable y pasa un Observer como parámetro

  • Anula el método onNext() para imprimir cada documento recuperado, que son instancias Document

  • Anula el método onError() para imprimir cualquier error

  • Anula los métodos onComplete() para imprimir un mensaje después de que se procesen todos los resultados de Observable

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

Para acceder a los datos recuperados por una operación de escritura, cree un Observable[T] para almacenar los resultados. Luego, suscríbase al observable y sobrescriba los métodos de devolución de llamada de la clase Observer para procesar los resultados.

Este ejemplo inserta documentos en la colección restaurants cuyo valor cuisine es "Nepalese". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[InsertManyResult] a la operación y realiza las siguientes acciones:

  • Llama al método subscribe() para suscribirse al Observable y pasa un Observer como parámetro

  • Anula el método onNext() para imprimir el resultado de la operación de inserción, devuelto como un InsertManyResult

  • Anula el método onError() para imprimir cualquier error

  • Anula los métodos onComplete() para imprimir un mensaje después de que se procesen todos los resultados de Observable

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

En lugar de sobrescribir explícitamente las funciones de devolución de llamada de la clase Observer, puede usar funciones lambda para procesar los resultados de las operaciones de forma concisa. Estas funciones permiten usar la notación de flecha => para personalizar la implementación de onNext(), onError() y onComplete().

Tip

Para aprender más sobre las funciones lambda, también conocidas como funciones anónimas, consulta la Función anónima entrada en Wikipedia.

Este ejemplo consulta la colección restaurants para cada valor distinto del campo borough. El código se suscribe al Observable devuelto por el método distinct() y luego utiliza funciones lambda para imprimir los resultados y gestionar los errores:

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

Puedes suscribirte a un Observable implícitamente y agregar sus resultados llamando al método toFuture(). Al llamar a toFuture() en un Observable, el controlador realiza las siguientes acciones:

  • Suscríbete a la Observable

  • Recopila los elementos emitidos por Observable y los almacena en una instancia Future

Luego, puede iterar a través de Future para recuperar los resultados de la operación.

Importante

Si su Observable contiene una gran cantidad de documentos, llamar al toFuture() método consumirá una cantidad considerable de memoria. Si espera un conjunto de resultados grande, considere usar funciones de devolución de llamada o lambda para acceder a los resultados.

Este ejemplo consulta la colección restaurants en busca de documentos cuyo valor del campo name sea "The Halal Guys". Para acceder a los resultados de la operación, el código convierte Observable en Future, espera a que Future recopile cada resultado y los itera:

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Agregación de datos

En esta página