Docs Menu
Docs Home
/ /

Leer desde MongoDB en moda agrupar

Para leer datos de MongoDB, llame al read() Método en el objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para la operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

dataFrame.read.option()

Utilice el método option para configurar los ajustes de lectura por lotes, incluida la implementación de MongoDB cadena de conexión, base de datos y colección MongoDB y configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la Guía deopciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Tip

Tipo de marco de datos

DataFrame No existe como clase en la API de Java. Use Dataset<Row> para hacer referencia a un DataFrame.

Para leer datos de MongoDB, llame a la función read en su objeto SparkSession. Esta función devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para leer datos de MongoDB, llame al método read en su objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Tip

Tipo de marco de datos

Un DataFrame se representa mediante un Dataset de Row objetos. El tipo DataFrame es un alias de Dataset[Row].

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Para ver el esquema inferido, utilice el método printSchema() en su objeto Dataset<Row>, como se muestra en el siguiente ejemplo:

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice el método show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show();
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver el esquema inferido, utilice la función printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice la función show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver el esquema inferido, utilice el método printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice el método show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

Puede especificar un esquema que contenga valores de campo conocidos para usar durante la inferencia del esquema mediante la opción de configuración schemaHints. Puede especificar la opción schemaHints en cualquiera de los siguientes formatos de Spark:

Tipo
Formato

DDL

<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>

SQL DDL

STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>

JSON

{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

El siguiente ejemplo muestra cómo especificar la opción schemaHints en cada formato mediante el shell de Spark. El ejemplo especifica un campo de cadena llamado "value" y un campo de entero llamado "count".

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

También puede especificar la opción schemaHints en el formato DDL de cadena simple o en formato JSON utilizando PySpark, como se muestra en el siguiente ejemplo:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • Mayor que o igual

  • En

  • Es nulo

  • Menos que

  • Menor o igual

  • No

  • O

  • CadenaContiene

  • CadenaTerminaCon

  • CadenaComienzaCon

Puede utilizar expresiones de agregación de Java para filtrar sus datos.

Considere una colección llamada fruit que contiene los siguientes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primero, crea un DataFrame para conectarte a tu fuente de datos MongoDB predeterminada:

Dataset<Row> df = spark.read()
.format("mongodb")
.option("database", "food")
.option("collection", "fruit")
.load();

El siguiente ejemplo recupera solo los registros en los que el valor del campo qty es mayor o igual a 10:

df.filter(df.col("qty").gte(10))

La operación genera el siguiente resultado:

{ "_id" : 2, "qty" : 10.0, "type" : "orange" }
{ "_id" : 3, "qty" : 15.0, "type" : "banana" }

Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • Mayor que o igual

  • En

  • Es nulo

  • Menos que

  • Menor o igual

  • No

  • O

  • CadenaContiene

  • CadenaTerminaCon

  • CadenaComienzaCon

Utilice filter() para leer un subconjunto de datos de su colección MongoDB.

Considere una colección llamada fruit que contiene los siguientes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primero, configure un objeto DataFrame para conectarse con su fuente de datos MongoDB predeterminada:

df = spark.read.format("mongodb").load()

El siguiente ejemplo incluye solo los registros en los que el campo qty es mayor o igual a 10.

df.filter(df['qty'] >= 10).show()

La operación imprime la siguiente salida:

{ "_id" : 2, "qty" : 10.0, "type" : "orange" }
{ "_id" : 3, "qty" : 15.0, "type" : "banana" }

Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • Mayor que o igual

  • En

  • Es nulo

  • Menos que

  • Menor o igual

  • No

  • O

  • CadenaContiene

  • CadenaTerminaCon

  • CadenaComienzaCon

El siguiente ejemplo filtra y saca los caracteres con edades menores a 100:

df.filter(df("age") < 100).show()

La operación genera el siguiente resultado:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }

Antes de ejecutar consultas SQL en su conjunto de datos, debe registrar una vista temporal para el conjunto de datos.

La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o anteriores:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() produce lo siguiente:

{ "name" : "Gandalf", "age" : 1000 }
{ "name" : "Thorin", "age" : 195 }
{ "name" : "Balin", "age" : 178 }
{ "name" : "Dwalin", "age" : 169 }
{ "name" : "Óin", "age" : 167 }
{ "name" : "Glóin", "age" : 158 }

Antes de poder ejecutar consultas SQL en su DataFrame, debe registrar una tabla temporal.

El siguiente ejemplo registra una tabla temporal llamada temp y luego utiliza SQL para consultar registros en los que el campo type contiene la letra e:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

En el shell pyspark, la operación imprime la siguiente salida:

{ "type" : "apple", "qty" : 5.0 }
{ "type" : "orange", "qty" : 10.0 }

Antes de ejecutar consultas SQL en su conjunto de datos, debe registrar una vista temporal para el conjunto de datos.

La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o anteriores:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

centenarians.show() produce lo siguiente:

{ "name" : "Gandalf", "age" : 1000 }
{ "name" : "Thorin", "age" : 195 }
{ "name" : "Balin", "age" : 178 }
{ "name" : "Dwalin", "age" : 169 }
{ "name" : "Óin", "age" : 167 }
{ "name" : "Glóin", "age" : 158 }

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Moda por agrupaciones

En esta página