Overview
Para leer datos de MongoDB, llame al read() método en tu objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puedes usar para especificar el formato y otras configuraciones para tu operación de lectura por lote.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Utiliza |
| Utilice el método Para ver una lista de las opciones de configuración de lectura por agrupadas, consulta el Guía de Opciones de configuración de lectura por lotes. |
El siguiente ejemplo de código muestra cómo utilizar la configuración previa para leer datos de people.contacts en MongoDB:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Tip
Tipo de marco de datos
DataFrame no existe como una clase en la API de Java. Use Dataset<Row> para referenciar un DataFrame.
Para leer datos de MongoDB, llame a la función read en su objeto SparkSession. Esta función devuelve un objeto DataFrameReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura por lotes.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Utiliza |
| Utiliza el método Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes. |
El siguiente ejemplo de código muestra cómo utilizar la configuración previa para leer datos de people.contacts en MongoDB:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para leer datos de MongoDB, llama al método read en tu objeto SparkSession. Este método devuelve un objeto DataFrameReader, que puede utilizar para especificar el formato y otros ajustes de configuración para su operación de lectura por lotes.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Utiliza |
| Utiliza el método Para obtener una lista de las opciones de configuración de lectura por lotes, consulte la guía Opciones de configuración de lectura por lotes. |
El siguiente ejemplo de código muestra cómo utilizar la configuración previa para leer datos de people.contacts en MongoDB:
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Tip
Tipo de marco de datos
Un DataFrame está representado por un Dataset de objetos Row. El tipo DataFrame es un alias de Dataset[Row].
Inferencia de esquemas
Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.
Suponga que la colección de MongoDB people.contacts contiene los siguientes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Para ver el esquema inferido, utilice el método printSchema() en su objeto Dataset<Row>, como se muestra en el siguiente ejemplo:
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver los datos en DataFrame, utiliza el método show() en tu objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.show();
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.
Suponga que la colección de MongoDB people.contacts contiene los siguientes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ver el esquema inferido, utiliza la función printSchema() en tu objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver los datos en el DataFrame, utilice la función show() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
Cuando se carga un conjunto de datos o un marco de datos sin un esquema, Spark muestrea los registros para inferir el esquema de la colección.
Suponga que la colección de MongoDB people.contacts contiene los siguientes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
La siguiente operación carga datos de people.contacts e infiere el esquema del DataFrame:
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ver el esquema inferido, utilice el método printSchema() en su objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver los datos en DataFrame, utiliza el método show() en tu objeto DataFrame, como se muestra en el siguiente ejemplo:
dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
Especificar campos conocidos con sugerencias de esquema
Puede especificar un esquema con valores de campos conocidos para utilizar durante la inferencia de esquemas, especificando la opción de configuración schemaHints. Puedes especificar la opción schemaHints en cualquiera de los siguientes formatos de Spark:
Tipo | Formato | |||
|---|---|---|---|---|
DDL |
| |||
SQL DDL |
| |||
JSON | |
El siguiente ejemplo muestra cómo especificar la opción schemaHints en cada formato usando la shell de Spark. El ejemplo especifica un campo de tipo cadena llamado "value" y un campo de tipo entero llamado "count".
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
También puedes especificar la opción schemaHints en el formato DDL Simple String, o en formato JSON usando PySpark, como se muestra en el siguiente ejemplo:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
Filtros
Al usar filtros con DataFrames o Datasets, el código subyacente del MongoDB Connector construye un pipeline de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y realizar solo el procesamiento de los datos que necesitas.
MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:
Y
IgualNullSafe
EqualTo
Mayor que
GreaterThanOrEqual
En
Es nulo
Menos que
MenorOIgual
No
o
CadenaContiene
StringEndsWith
StringStartsWith
Puede usar Expresiones de agregación de Java para filtrar sus datos.
Considere una colección llamada fruit que contiene los siguientes documentos:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Primero, cree un DataFrame para conectar con su fuente de datos por defecto de MongoDB:
Dataset<Row> df = spark.read() .format("mongodb") .option("database", "food") .option("collection", "fruit") .load();
El siguiente ejemplo recupera solo los registros en los que el valor del campo qty es mayor o igual a 10:
df.filter(df.col("qty").gte(10))
La operación produce lo siguiente:
{ "_id" : 2, "qty" : 10.0, "type" : "orange" } { "_id" : 3, "qty" : 15.0, "type" : "banana" }
Al usar filtros con DataFrames o Datasets, el código subyacente del MongoDB Connector construye un pipeline de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y realizar solo el procesamiento de los datos que necesitas.
MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:
Y
IgualNullSafe
EqualTo
Mayor que
GreaterThanOrEqual
En
Es nulo
Menos que
MenorOIgual
No
o
CadenaContiene
StringEndsWith
StringStartsWith
Utilice filter() para leer un subconjunto de datos de su colección MongoDB.
Considere una colección llamada fruit que contiene los siguientes documentos:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Primero, configura un objeto DataFrame para conectarte con tu fuente de datos MongoDB por defecto:
df = spark.read.format("mongodb").load()
El siguiente ejemplo incluye solo los registros en los que el campo qty es mayor o igual a 10.
df.filter(df['qty'] >= 10).show()
La operación imprime la siguiente salida:
{ "_id" : 2, "qty" : 10.0, "type" : "orange" } { "_id" : 3, "qty" : 15.0, "type" : "banana" }
Al usar filtros con DataFrames o Datasets, el código subyacente del MongoDB Connector construye un pipeline de agregación para filtrar los datos en MongoDB antes de enviarlos a Spark. Esto mejora el rendimiento de Spark al recuperar y realizar solo el procesamiento de los datos que necesitas.
MongoDB Spark Connector convierte los siguientes filtros en etapas del pipeline de agregación:
Y
IgualNullSafe
EqualTo
Mayor que
GreaterThanOrEqual
En
Es nulo
Menos que
MenorOIgual
No
o
CadenaContiene
StringEndsWith
StringStartsWith
El siguiente ejemplo filtra y saca los caracteres con edades menores a 100:
df.filter(df("age") < 100).show()
La operación produce lo siguiente:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
Consultas SQL
Antes de ejecutar consultas SQL en tu Dataset, debes registrar una vista temporal para el Dataset.
La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o más antiguos:
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show() genera lo siguiente:
{ "name" : "Gandalf", "age" : 1000 } { "name" : "Thorin", "age" : 195 } { "name" : "Balin", "age" : 178 } { "name" : "Dwalin", "age" : 169 } { "name" : "Óin", "age" : 167 } { "name" : "Glóin", "age" : 158 }
Antes de poder ejecutar consultas SQL en tu DataFrame, necesitas registrar una tabla temporal.
El siguiente ejemplo registra una tabla temporal llamada temp, luego utiliza SQL para query los registros en los que el campo type contiene la letra e:
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
En el shell pyspark, la operación imprime la siguiente salida:
{ "type" : "apple", "qty" : 5.0 } { "type" : "orange", "qty" : 10.0 }
Antes de ejecutar consultas SQL en tu Dataset, debes registrar una vista temporal para el Dataset.
La siguiente operación registra una tabla characters y luego la consulta para encontrar todos los caracteres que sean 100 o más antiguos:
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
centenarians.show() genera lo siguiente:
{ "name" : "Gandalf", "age" : 1000 } { "name" : "Thorin", "age" : 195 } { "name" : "Balin", "age" : 178 } { "name" : "Dwalin", "age" : 169 } { "name" : "Óin", "age" : 167 } { "name" : "Glóin", "age" : 158 }
Documentación de la API
Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark: