Overview
Para leer datos de MongoDB, llame al read() Método en el objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para la operación de lectura por lotes.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Use |
| Utilice el método Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la Guía deopciones de configuración de lectura por lotes. |
El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Tip
Tipo de marco de datos
DataFrame No existe como clase en la API de Java. Use Dataset<Row> para hacer referencia a un DataFrame.
Para leer datos de MongoDB, llame a la función read en su objeto SparkSession. Esta función devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Use |
| Utiliza el método Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes. |
El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para leer datos de MongoDB, llame al método read en su objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Use |
| Utiliza el método Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes. |
El siguiente ejemplo de código muestra cómo utilizar la configuración anterior para leer datos de people.contacts en MongoDB:
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Tip
Tipo de marco de datos
Un DataFrame se representa mediante un Dataset de Row objetos. El tipo DataFrame es un alias de Dataset[Row].
Inferencia de esquemas
Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.
Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Para ver el esquema inferido, utilice el método printSchema() en su objeto Dataset<Row>, como se muestra en el siguiente ejemplo:
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver los datos en el DataFrame, utilice el método show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.show();
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.
Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ver el esquema inferido, utilice la función printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver los datos en el DataFrame, utilice la función show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.
Supongamos que la colección MongoDB people.contacts contiene los siguientes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ver el esquema inferido, utilice el método printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver los datos en el DataFrame, utilice el método show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
Especificar campos conocidos con sugerencias de esquema
Puede especificar un esquema que contenga valores de campo conocidos para usar durante la inferencia del esquema mediante la opción de configuración schemaHints. Puede especificar la opción schemaHints en cualquiera de los siguientes formatos de Spark:
Tipo | Formato | |||
|---|---|---|---|---|
DDL |
| |||
SQL DDL |
| |||
JSON | |
El siguiente ejemplo muestra cómo especificar la opción schemaHints en cada formato mediante el shell de Spark. El ejemplo especifica un campo de cadena llamado "value" y un campo de entero llamado "count".
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
También puede especificar la opción schemaHints en el formato DDL de cadena simple o en formato JSON utilizando PySpark, como se muestra en el siguiente ejemplo:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
Filtros
Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.
MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:
Y
IgualNullSafe
EqualTo
Mayor que
Mayor que o igual
En
Es nulo
Menos que
Menor o igual
No
O
CadenaContiene
CadenaTerminaCon
CadenaComienzaCon
Puede utilizar expresiones de agregación de Java para filtrar sus datos.
Considere una colección llamada fruit que contiene los siguientes documentos:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Primero, crea un DataFrame para conectarte a tu fuente de datos MongoDB predeterminada:
Dataset<Row> df = spark.read() .format("mongodb") .option("database", "food") .option("collection", "fruit") .load();
El siguiente ejemplo recupera solo los registros en los que el valor del campo qty es mayor o igual a 10:
df.filter(df.col("qty").gte(10))
La operación genera el siguiente resultado:
{ "_id" : 2, "qty" : 10.0, "type" : "orange" } { "_id" : 3, "qty" : 15.0, "type" : "banana" }
Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.
MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:
Y
IgualNullSafe
EqualTo
Mayor que
Mayor que o igual
En
Es nulo
Menos que
Menor o igual
No
O
CadenaContiene
CadenaTerminaCon
CadenaComienzaCon
Utilice filter() para leer un subconjunto de datos de su colección MongoDB.
Considere una colección llamada fruit que contiene los siguientes documentos:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Primero, configure un objeto DataFrame para conectarse con su fuente de datos MongoDB predeterminada:
df = spark.read.format("mongodb").load()
El siguiente ejemplo incluye solo los registros en los que el campo qty es mayor o igual a 10.
df.filter(df['qty'] >= 10).show()
La operación imprime la siguiente salida:
{ "_id" : 2, "qty" : 10.0, "type" : "orange" } { "_id" : 3, "qty" : 15.0, "type" : "banana" }
Al usar filtros con DataFrames o DataSets, el código subyacente del Conector MongoDB construye una canalización de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y procesar solo los datos necesarios.
MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:
Y
IgualNullSafe
EqualTo
Mayor que
Mayor que o igual
En
Es nulo
Menos que
Menor o igual
No
O
CadenaContiene
CadenaTerminaCon
CadenaComienzaCon
El siguiente ejemplo filtra y saca los caracteres con edades menores a 100:
df.filter(df("age") < 100).show()
La operación genera el siguiente resultado:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
Consultas SQL
Antes de ejecutar consultas SQL en su conjunto de datos, debe registrar una vista temporal para el conjunto de datos.
La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o anteriores:
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show() produce lo siguiente:
{ "name" : "Gandalf", "age" : 1000 } { "name" : "Thorin", "age" : 195 } { "name" : "Balin", "age" : 178 } { "name" : "Dwalin", "age" : 169 } { "name" : "Óin", "age" : 167 } { "name" : "Glóin", "age" : 158 }
Antes de poder ejecutar consultas SQL en su DataFrame, debe registrar una tabla temporal.
El siguiente ejemplo registra una tabla temporal llamada temp y luego utiliza SQL para consultar registros en los que el campo type contiene la letra e:
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
En el shell pyspark, la operación imprime la siguiente salida:
{ "type" : "apple", "qty" : 5.0 } { "type" : "orange", "qty" : 10.0 }
Antes de ejecutar consultas SQL en su conjunto de datos, debe registrar una vista temporal para el conjunto de datos.
La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o anteriores:
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
centenarians.show() produce lo siguiente:
{ "name" : "Gandalf", "age" : 1000 } { "name" : "Thorin", "age" : 195 } { "name" : "Balin", "age" : 178 } { "name" : "Dwalin", "age" : 169 } { "name" : "Óin", "age" : 167 } { "name" : "Glóin", "age" : 158 }
Documentación de la API
Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark: