Docs Menu
Docs Home
/
Conector Spark de MongoDB
/

Leer desde MongoDB en moda agrupar

Para leer datos de MongoDB, llame al read() Método en el objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para la operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la Guía deopciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Tip

Tipo de marco de datos

DataFrame No existe como clase en la API de Java. Use Dataset<Row> para hacer referencia a un DataFrame.

Para leer datos de MongoDB, llame a la función read en su objeto SparkSession. Esta función devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para leer datos de MongoDB, llame al método read en su objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Tip

Tipo de marco de datos

Un DataFrame se representa mediante un Dataset de Row objetos. El tipo DataFrame es un alias de Dataset[Row].

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Para ver el esquema inferido, utilice el método printSchema() en su objeto Dataset<Row>, como se muestra en el siguiente ejemplo:

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice el método show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show();
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver el esquema inferido, utilice la función printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice la función show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver el esquema inferido, utilice el método printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice el método show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • Mayor que o igual

  • En

  • Es nulo

  • Menos que

  • Menor o igual

  • No

  • O

  • CadenaContiene

  • CadenaTerminaCon

  • CadenaComienzaCon

Utilice filter() para leer un subconjunto de datos de su colección MongoDB.

Considere una colección llamada fruit que contiene los siguientes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primero, configure un objeto DataFrame para conectarse con su fuente de datos MongoDB predeterminada:

df = spark.read.format("mongodb").load()

El siguiente ejemplo incluye solo los registros en los que el campo qty es mayor o igual a 10.

df.filter(df['qty'] >= 10).show()

La operación imprime la siguiente salida:

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • Mayor que o igual

  • En

  • Es nulo

  • Menos que

  • Menor o igual

  • No

  • O

  • CadenaContiene

  • CadenaTerminaCon

  • CadenaComienzaCon

El siguiente ejemplo filtra y saca los caracteres con edades menores a 100:

df.filter(df("age") < 100).show()

La operación genera el siguiente resultado:

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

Antes de ejecutar consultas SQL en su conjunto de datos, debe registrar una vista temporal para el conjunto de datos.

La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o más antiguos:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() produce lo siguiente:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

Antes de poder ejecutar consultas SQL en su DataFrame, debe registrar una tabla temporal.

El siguiente ejemplo registra una tabla temporal llamada temp y luego utiliza SQL para consultar registros en los que el campo type contiene la letra e:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

En el shell pyspark, la operación imprime la siguiente salida:

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

Antes de ejecutar consultas SQL en su conjunto de datos, debe registrar una vista temporal para el conjunto de datos.

La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o más antiguos:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Moda por agrupaciones

En esta página