Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Leer desde MongoDB en moda agrupar

Para leer datos de MongoDB, llame al read() método en tu objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puedes usar para especificar el formato y otras configuraciones para tu operación de lectura por lote.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

dataFrame.read.option()

Utilice el método option para configurar los ajustes de lectura por lotes, incluida la implementación de MongoDB cadena de conexión, base de datos y colección MongoDB y configuración del particionador.

Para ver una lista de las opciones de configuración de lectura por agrupadas, consulta el Guía de Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración previa para leer datos de people.contacts en MongoDB:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Tip

Tipo de marco de datos

DataFrame no existe como una clase en la API de Java. Use Dataset<Row> para referenciar un DataFrame.

Para leer datos de MongoDB, llame a la función read en su objeto SparkSession. Esta función devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración previa para leer datos de people.contacts en MongoDB:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para leer datos de MongoDB, llama al método read en tu objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede utilizar para especificar el formato y otros ajustes de configuración para su operación de lectura por lotes.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

dataFrame.read.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

dataFrame.read.option()

Utiliza el método option para configurar los ajustes de lectura por lotes, incluyendo la implementación de MongoDB cadena de conexión, la base de datos y colección de MongoDB, y la configuración del particionador.

Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes.

El siguiente ejemplo de código muestra cómo utilizar la configuración previa para leer datos de people.contacts en MongoDB:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Tip

Tipo de marco de datos

Un DataFrame está representado por un Dataset de objetos Row. El tipo DataFrame es un alias de Dataset[Row].

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Suponga que la colección de MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Para ver el esquema inferido, utilice el método printSchema() en su objeto Dataset<Row>, como se muestra en el siguiente ejemplo:

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en DataFrame, utiliza el método show() en tu objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show();
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Suponga que la colección de MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver el esquema inferido, utiliza la función printSchema() en tu objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en el DataFrame, utilice la función show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.

Suponga que la colección de MongoDB people.contacts contiene los siguientes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver el esquema inferido, utilice el método printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver los datos en DataFrame, utiliza el método show() en tu objeto DataFrame, como se muestra en el siguiente ejemplo:

dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

Puede especificar un esquema con valores de campos conocidos para utilizar durante la inferencia de esquemas, especificando la opción de configuración schemaHints. Puedes especificar la opción schemaHints en cualquiera de los siguientes formatos de Spark:

Tipo
Formato

DDL

<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>

SQL DDL

STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>

JSON

{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

El siguiente ejemplo muestra cómo especificar la opción schemaHints en cada formato usando la shell de Spark. El ejemplo especifica un campo de tipo cadena llamado "value" y un campo de tipo entero llamado "count".

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

También puedes especificar la opción schemaHints en el formato DDL Simple String, o en formato JSON usando PySpark, como se muestra en el siguiente ejemplo:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

Al usar filtros con DataFrames o Datasets, el código subyacente del MongoDB Connector construye un pipeline de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y realizar solo el procesamiento de los datos que necesitas.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • GreaterThanOrEqual

  • En

  • Es nulo

  • Menos que

  • MenorOIgual

  • No

  • o

  • CadenaContiene

  • StringEndsWith

  • StringStartsWith

Puede usar Expresiones de agregación de Java para filtrar sus datos.

Considere una colección llamada fruit que contiene los siguientes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primero, cree un DataFrame para conectar con su fuente de datos por defecto de MongoDB:

Dataset<Row> df = spark.read()
.format("mongodb")
.option("database", "food")
.option("collection", "fruit")
.load();

El siguiente ejemplo recupera solo los registros en los que el valor del campo qty es mayor o igual a 10:

df.filter(df.col("qty").gte(10))

La operación produce lo siguiente:

{ "_id" : 2, "qty" : 10.0, "type" : "orange" }
{ "_id" : 3, "qty" : 15.0, "type" : "banana" }

Al usar filtros con DataFrames o Datasets, el código subyacente del MongoDB Connector construye un pipeline de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y realizar solo el procesamiento de los datos que necesitas.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • GreaterThanOrEqual

  • En

  • Es nulo

  • Menos que

  • MenorOIgual

  • No

  • o

  • CadenaContiene

  • StringEndsWith

  • StringStartsWith

Utilice filter() para leer un subconjunto de datos de su colección MongoDB.

Considere una colección llamada fruit que contiene los siguientes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primero, configura un objeto DataFrame para conectarte con tu fuente de datos MongoDB por defecto:

df = spark.read.format("mongodb").load()

El siguiente ejemplo incluye solo los registros en los que el campo qty es mayor o igual a 10.

df.filter(df['qty'] >= 10).show()

La operación imprime la siguiente salida:

{ "_id" : 2, "qty" : 10.0, "type" : "orange" }
{ "_id" : 3, "qty" : 15.0, "type" : "banana" }

Al usar filtros con DataFrames o Datasets, el código subyacente del MongoDB Connector construye un pipeline de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y realizar solo el procesamiento de los datos que necesitas.

MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:

  • Y

  • IgualNullSafe

  • EqualTo

  • Mayor que

  • GreaterThanOrEqual

  • En

  • Es nulo

  • Menos que

  • MenorOIgual

  • No

  • o

  • CadenaContiene

  • StringEndsWith

  • StringStartsWith

El siguiente ejemplo filtra y saca los caracteres con edades menores a 100:

df.filter(df("age") < 100).show()

La operación produce lo siguiente:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }

Antes de ejecutar consultas SQL en tu Dataset, debes registrar una vista temporal para el Dataset.

La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o más antiguos:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() genera lo siguiente:

{ "name" : "Gandalf", "age" : 1000 }
{ "name" : "Thorin", "age" : 195 }
{ "name" : "Balin", "age" : 178 }
{ "name" : "Dwalin", "age" : 169 }
{ "name" : "Óin", "age" : 167 }
{ "name" : "Glóin", "age" : 158 }

Antes de poder ejecutar consultas SQL en tu DataFrame, necesitas registrar una tabla temporal.

El siguiente ejemplo registra una tabla temporal llamada temp, luego utiliza SQL para query los registros en los que el campo type contiene la letra e:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

En el shell pyspark, la operación imprime la siguiente salida:

{ "type" : "apple", "qty" : 5.0 }
{ "type" : "orange", "qty" : 10.0 }

Antes de ejecutar consultas SQL en tu Dataset, debes registrar una vista temporal para el Dataset.

La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o más antiguos:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

centenarians.show() genera lo siguiente:

{ "name" : "Gandalf", "age" : 1000 }
{ "name" : "Thorin", "age" : 195 }
{ "name" : "Balin", "age" : 178 }
{ "name" : "Dwalin", "age" : 169 }
{ "name" : "Óin", "age" : 167 }
{ "name" : "Glóin", "age" : 158 }

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Moda por agrupaciones

En esta página