Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Acceder a datos de un observable

En esta guía, puedes aprender cómo acceder a los resultados de las operaciones de MongoDB desde un Observable instancia.

Un Observable representa un flujo de datos emitido por una operación a lo largo del tiempo. Para acceder a estos datos, puedes crear una instancia Observer que se suscriba a Observable.

Nota

El controlador Scala se basa en el controlador Java Reactive Streams de MongoDB. La clase Observable extiende la clase Publisher de Java Reactive Streams e incluye métodos adicionales para facilitar el procesamiento de los resultados.

Para ejecutar una operación de MongoDB y procesar sus datos, debes solicitar los resultados de la operación desde un Observable. El controlador ofrece la interfaz Observable para operaciones que devuelven cualquier número de resultados. Las operaciones que no producen resultados o que producen un solo resultado, como el método findOne(), devuelven un SingleObservable[T]. La parametrización [T] corresponde al tipo de dato que gestiona el SingleObservable.

Las operaciones que pueden producir un número ilimitado de resultados devuelven una instancia del tipo Observable[T]. Algunas operaciones devuelven tipos Observable específicos que proporcionan métodos adicionales para filtrar y procesar los resultados antes de suscribirse a ellos. La siguiente lista describe algunos tipos que permiten encadenar métodos específicos de la operación al tipo Observable:

  • FindObservable[T]: Devuelto por el método find()

  • DistinctObservable[T]: Devuelto por el método distinct()

  • AggregateObservable[T]: Devuelto por el método aggregate()

Puedes solicitar los resultados de la operación llamando al método subscribe() sobre el/la Observable de la operación. Pasa una instancia de la clase Observer como parámetro al método subscribe(). Este Observer recibe los resultados de la operación del Observable. Luego, se pueden utilizar los métodos proporcionados por la clase Observer para imprimir resultados, gestionar errores y realizar procesamiento de datos adicional.

Para obtener más información sobre el procesamiento de los resultados, consulte la siguiente documentación de la API:

  • Observable

  • Suscripción

  • Persona observadora

Tip

Tiempo de espera observable

Puedes establecer un tiempo de espera en tu Observable para que devuelva los resultados de la query. Para obtener más información, consulta el Observables sección de la guía Límite para Tiempo de Ejecución del Servidor.

Los ejemplos de esta guía utilizan la colección restaurants en la base de datos sample_restaurants de los conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde tu aplicación Scala, crea un MongoClient que se conecte a un clúster Atlas y asigna los siguientes valores a tus variables database y collection:

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de ejemplo, consulta la guía MongoDB Get Started.

Después de suscribirse a un Observable[T], puede utilizar los siguientes métodos de devolución de llamada proporcionados por la clase Observer para acceder a los resultados de la operación o gestionar errores:

  • onNext(result: TResult): Se llama cuando el Observer recibe resultados nuevos. Se puede definir la lógica para el procesamiento de resultados sobrescribiendo este método.

  • onError(e: Throwable)Se llama cuando la operación genera un error e impide que Observer reciba más datos de Observable. Puede definir la lógica de gestión de errores anulando este método.

  • onComplete()Se llama cuando el Observer ha consumido todos los resultados del Observable. Puedes realizar cualquier procesamiento final de datos anulando este método.

Las siguientes secciones muestran cómo personalizar estos métodos para procesar los resultados de operaciones de lectura y guardar desde un Observable.

Para acceder a los datos recuperados por una operación de lectura, crea un Observable[T] para almacenar los resultados de la operación. Después, suscríbete al observable y anule los métodos de función de retorno de la clase Observer para procesar los resultados.

Este ejemplo consulta la colección restaurants en busca de documentos cuyo valor cuisine sea "Czech". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[Document] a la operación y realiza las siguientes acciones:

  • Llama al método subscribe() para suscribirse al Observable y pasa un Observer como parámetro

  • Sobrescribe el método onNext() para imprimir cada documento recuperado, que son instancias de Document

  • Anula el método onError() para imprimir cualquier error

  • Anula los métodos onComplete() para imprimir un mensaje después de que se procesen todos los resultados de Observable

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

Para acceder a los datos recuperados mediante una operación de guardar, crea un Observable[T] para almacenar los resultados de la operación. Luego, suscríbete al observable y sobrescribe los métodos de función de retorno de clase Observer para procesar los resultados.

Este ejemplo inserta documentos en la colección restaurants cuyo valor cuisine es "Nepalese". Para recuperar y procesar los resultados, el ejemplo asigna un valor Observable[InsertManyResult] a la operación y realiza las siguientes acciones:

  • Llama al método subscribe() para suscribirse al Observable y pasa un Observer como parámetro

  • Anula el método onNext() para imprimir el resultado de la operación de inserción, devuelto como un InsertManyResult

  • Anula el método onError() para imprimir cualquier error

  • Anula los métodos onComplete() para imprimir un mensaje después de que se procesen todos los resultados de Observable

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

En lugar de sobreescribir explícitamente las funciones de función de retorno de la clase Observer, puedes usar funciones lambda para procesar concisamente los resultados de la operación. Estas funciones permiten utilizar la notación de flecha => para personalizar la implementación de onNext(), onError() y onComplete().

Tip

Para aprender más sobre las funciones lambda, también conocidas como funciones anónimas, consulta la Función anónima entrada en Wikipedia.

Este ejemplo query la colección restaurants para cada valor distinto del campo borough. El código se suscribe al Observable devuelto por el método distinct(), luego utiliza funciones lambda para imprimir resultados y gestionar errores:

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

Puedes suscribirte a un Observable de forma implícita y agregar sus resultados llamando al método toFuture(). Cuando llames a toFuture() en un Observable, el driver realiza las siguientes acciones:

  • Suscríbete a la Observable

  • Recopila los elementos emitidos por Observable y los almacena en una instancia Future

Luego, puedes iterar a través del Future para recuperar los resultados de la operación.

Importante

Si su Observable contiene una gran cantidad de documentos, llamar al toFuture() método consumirá una cantidad considerable de memoria. Si espera un conjunto de resultados grande, considere usar funciones de devolución de llamada o lambda para acceder a los resultados.

Este ejemplo consulta la restaurants colección para documentos en los cuales el valor del campo name es "The Halal Guys". Para acceder a los resultados de la operación, el código convierte el Observable en un Future, espera a que el Future recoja cada resultado y recorre los resultados:

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Agregación de datos

En esta página