Requisitos previos
Conocimientos básicos de MongoDB y Apache Spark. Consulte la Documentación deMongoDB, documentación de Sparky este documento técnico de MongoDB para obtener más detalles.
Versión de MongoDB 4.2 o posterior
Versión Spark 4.0 o posterior
Java 8 o versiones posteriores
Cómo empezar
Importante
En la versión 10.0.0 y posteriores del Conector, utilice el formato
mongodb Para leer y escribir en MongoDB:
df = spark.read.format("mongodb").load()
Gestión de dependencias
Proporcione las dependencias de Spark Core, Spark SQL y MongoDB Spark Connector a su herramienta de gestión de dependencias.
El siguiente ejemplo muestra cómo incluir estas dependencias en un archivo Maven pom.xml:
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.13</artifactId> <version>11.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.13</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.13</artifactId> <version>4.0.0</version> </dependency> </dependencies>
Configuración
Al especificar la configuración del conector mediante SparkSession, debe prefijar la configuración correctamente. Para obtener más información y otras opciones disponibles del conector MongoDB Spark, consulte
Guíade configuración de Spark.
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
El
spark.mongodb.read.connection.uriespecifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que conectarse (test), la colección (myCollection) de la que leer datos y la preferencia de lectura.El
spark.mongodb.write.connection.uriespecifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se debe conectar (test) y la colección (myCollection) a la que se deben guardar los datos.
Puedes utilizar un SparkSession objeto para guardar datos en MongoDB, leer datos de MongoDB, crear conjuntos de datos y realizar operaciones SQL.
Importante
En la versión 10.0.0 y posteriores del Connector, utilizar el formato mongodb para leer de y guardar en MongoDB:
df = spark.read.format("mongodb").load()
Shell de Python Spark
Este tutorial utiliza el shell pyspark, pero el código también funciona con aplicaciones autónomas de Python.
Al iniciar el shell pyspark, se puede especificar:
la opción
--packagespara descargar el paquete de MongoDB Spark Connector. El siguiente paquete está disponible:mongo-spark-connector
la opción
--confpara configurar el MongoDB Spark Connector. Estos ajustes configuran el objetoSparkConf.Nota
Si utilizas
SparkConfpara configurar el Spark Connector, debes anteponer los ajustes adecuadamente. Para obtener detalles y otras opciones disponibles de MongoDB Spark Connector, consulta la guía Configurar Spark.
El siguiente ejemplo inicia el shell pyspark desde la línea de comandos:
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark::11.0.0
El
spark.mongodb.read.connection.uriespecifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se va a conectar (test), la colección (myCollection) de la que se van a leer los datos y la preferencia de lectura.El
spark.mongodb.write.connection.uriespecifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se conectará (test) y la colección (myCollection) a la que guardará los datos. Se conecta al puerto27017por defecto.La opción
packagesespecifica las coordenadas Maven de Spark Connector, en el formatogroupId:artifactId:version.
Los ejemplos de este tutorial utilizarán esta base de datos y colección.
Crear un SparkSession objeto
Nota
Cuando se inicia pyspark, se obtiene un objeto SparkSession llamado spark por defecto. En una aplicación autónoma de Python, se debe crear el objeto SparkSession explícitamente, como se muestra a continuación.
Si se especificaron las opciones de configuración spark.mongodb.read.connection.uri y spark.mongodb.write.connection.uri al iniciar pyspark, el objeto SparkSession por defecto las utiliza. Si prefiere crear su propio objeto SparkSession desde pyspark, puede utilizar SparkSession.builder y especificar diferentes opciones de configuración.
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
Se puede usar un objeto SparkSession para escribir datos en MongoDB, leer datos de MongoDB, crear DataFrames y realizar operaciones SQL.
Importante
En la versión 10.0.0 y posteriores del Connector, utilizar el formato mongodb para leer de y guardar en MongoDB:
df = spark.read.format("mongodb").load()
Spark Shell
Al iniciar la shell de Spark, especifique:
la opción
--packagespara descargar el paquete de MongoDB Spark Connector. El siguiente paquete está disponible:mongo-spark-connector
la opción
--confpara configurar el MongoDB Spark Connector. Estos ajustes configuran el objetoSparkConf.Nota
Si utilizas
SparkConfpara configurar el Spark Connector, debes anteponer los ajustes adecuadamente. Para obtener detalles y otras opciones disponibles de MongoDB Spark Connector, consulta la guía Configurar Spark.
Por ejemplo,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark::11.0.0
El
spark.mongodb.read.connection.uriespecifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se va a conectar (test), la colección (myCollection) de la que se van a leer los datos y la preferencia de lectura.El
spark.mongodb.write.connection.uriespecifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se conectará (test) y la colección (myCollection) a la que guardará los datos. Se conecta al puerto27017por defecto.La opción
packagesespecifica las coordenadas Maven de Spark Connector, en el formatogroupId:artifactId:version.
Importa el paquete MongoDB Connector
Habilitar las funciones y los implícitos específicos del conector de MongoDB para los objetos SparkSession y Dataset importando el siguiente paquete en el shell de Spark:
import com.mongodb.spark._
Conéctese a MongoDB
La conexión a MongoDB se realiza automáticamente cuando una acción de conjunto de datos requiere una lectura desde MongoDB o un guardado en MongoDB.
Aplicación autónoma de Scala
Gestión de dependencias
Proporcione las dependencias de Spark Core, Spark SQL y MongoDB Spark Connector a su herramienta de gestión de dependencias.
El siguiente extracto demuestra cómo incluir estas dependencias en un SBT archivo build.scala:
scalaVersion := "2.13", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.13" % "11.0.0", "org.apache.spark" %% "spark-core" % "4.0.0", "org.apache.spark" %% "spark-sql" % "4.0.0" )
Configuración
Al especificar la configuración del Connector mediante SparkSession, se debe prefijar los ajustes adecuadamente. Para obtener detalles y otras opciones disponibles de MongoDB Spark Connector, consultar la guía Configuración de Spark.
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
Solución de problemas
Si obtiene un java.net.BindException: Can't assign requested address,
Verificar que no tenga otro shell de Spark ya en ejecución.
Intentar configurar la variable de entorno
SPARK_LOCAL_IP; por ejemplo,export SPARK_LOCAL_IP=127.0.0.1 Intentar incluir la siguiente opción al iniciar la shell de Spark:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
Si aparecen errores al ejecutar los ejemplos de este tutorial, puede que se necesite borrar la caché local de Ivy (~/.ivy2/cache/org.mongodb.spark y ~/.ivy2/jars).
Integrations
Las siguientes secciones describen algunas plataformas populares de terceros con las que puede integrar Spark y el Spark Connector para MongoDB.
Amazon EMR
Amazon EMR es una plataforma de clúster gestionado que se puede usar para ejecutar frameworks de big data como Spark. Para instalar Spark en un clúster EMR, consultar Introducción a Amazon EMR en la documentación de AWS.
Databricks
Databricks es una plataforma de análisis para desarrollar, implementar y el uso compartido de datos para empresas. Para integrar el MongoDB Spark Connector con Databricks, consultar MongoDB en la documentación de Databricks.
Docker
Docker es una plataforma de código abierto que ayuda a los desarrolladores a compilar, compartir y ejecutar aplicaciones en contenedores.
Para iniciar Spark en un contenedor Docker, consultar Apache Spark en la documentación de Docker y siga los pasos proporcionados.
Para aprender cómo implementar Atlas en Docker, consultar Crear una implementación local de Atlas con Docker.
Kubernetes
Kubernetes es una plataforma de código abierto para la automatización de la gestión de contenedores. Para ejecutar Spark en Kubernetes, consulta Running Spark on Kubernetes en la documentación de Spark.