Docs Menu
Docs Home
/ /

Comenzar con el Spark Connector

  • Conocimientos básicos de MongoDB y Apache Spark. Consulte la Documentación deMongoDB, documentación de Sparky este documento técnico de MongoDB para obtener más detalles.

  • Versión de MongoDB 4.2 o posterior

  • Versión Spark 4.0 o posterior

  • Java 8 o versiones posteriores

Importante

En la versión 10.0.0 y posteriores del Conector, utilice el formato mongodb Para leer y escribir en MongoDB:

df = spark.read.format("mongodb").load()

Proporcione las dependencias de Spark Core, Spark SQL y MongoDB Spark Connector a su herramienta de gestión de dependencias.

El siguiente ejemplo muestra cómo incluir estas dependencias en un archivo Maven pom.xml:

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.13</artifactId>
<version>11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>

Al especificar la configuración del conector mediante SparkSession, debe prefijar la configuración correctamente. Para obtener más información y otras opciones disponibles del conector MongoDB Spark, consulte Guíade configuración de Spark.

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • El spark.mongodb.read.connection.uri especifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que conectarse (test), la colección (myCollection) de la que leer datos y la preferencia de lectura.

  • El spark.mongodb.write.connection.uri especifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se debe conectar (test) y la colección (myCollection) a la que se deben guardar los datos.

Puedes utilizar un SparkSession objeto para guardar datos en MongoDB, leer datos de MongoDB, crear conjuntos de datos y realizar operaciones SQL.

Importante

En la versión 10.0.0 y posteriores del Connector, utilizar el formato mongodb para leer de y guardar en MongoDB:

df = spark.read.format("mongodb").load()

Este tutorial utiliza el shell pyspark, pero el código también funciona con aplicaciones autónomas de Python.

Al iniciar el shell pyspark, se puede especificar:

  • la opción --packages para descargar el paquete de MongoDB Spark Connector. El siguiente paquete está disponible:

    • mongo-spark-connector

  • la opción --conf para configurar el MongoDB Spark Connector. Estos ajustes configuran el objeto SparkConf.

    Nota

    Si utilizas SparkConf para configurar el Spark Connector, debes anteponer los ajustes adecuadamente. Para obtener detalles y otras opciones disponibles de MongoDB Spark Connector, consulta la guía Configurar Spark.

El siguiente ejemplo inicia el shell pyspark desde la línea de comandos:

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:​:11.0.0
  • El spark.mongodb.read.connection.uri especifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se va a conectar (test), la colección (myCollection) de la que se van a leer los datos y la preferencia de lectura.

  • El spark.mongodb.write.connection.uri especifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se conectará (test) y la colección (myCollection) a la que guardará los datos. Se conecta al puerto 27017 por defecto.

  • La opción packages especifica las coordenadas Maven de Spark Connector, en el formato groupId:artifactId:version.

Los ejemplos de este tutorial utilizarán esta base de datos y colección.

Nota

Cuando se inicia pyspark, se obtiene un objeto SparkSession llamado spark por defecto. En una aplicación autónoma de Python, se debe crear el objeto SparkSession explícitamente, como se muestra a continuación.

Si se especificaron las opciones de configuración spark.mongodb.read.connection.uri y spark.mongodb.write.connection.uri al iniciar pyspark, el objeto SparkSession por defecto las utiliza. Si prefiere crear su propio objeto SparkSession desde pyspark, puede utilizar SparkSession.builder y especificar diferentes opciones de configuración.

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

Se puede usar un objeto SparkSession para escribir datos en MongoDB, leer datos de MongoDB, crear DataFrames y realizar operaciones SQL.

Importante

En la versión 10.0.0 y posteriores del Connector, utilizar el formato mongodb para leer de y guardar en MongoDB:

df = spark.read.format("mongodb").load()

Al iniciar la shell de Spark, especifique:

  • la opción --packages para descargar el paquete de MongoDB Spark Connector. El siguiente paquete está disponible:

    • mongo-spark-connector

  • la opción --conf para configurar el MongoDB Spark Connector. Estos ajustes configuran el objeto SparkConf.

    Nota

    Si utilizas SparkConf para configurar el Spark Connector, debes anteponer los ajustes adecuadamente. Para obtener detalles y otras opciones disponibles de MongoDB Spark Connector, consulta la guía Configurar Spark.

Por ejemplo,

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:​:11.0.0
  • El spark.mongodb.read.connection.uri especifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se va a conectar (test), la colección (myCollection) de la que se van a leer los datos y la preferencia de lectura.

  • El spark.mongodb.write.connection.uri especifica la dirección del servidor MongoDB (127.0.0.1), la base de datos a la que se conectará (test) y la colección (myCollection) a la que guardará los datos. Se conecta al puerto 27017 por defecto.

  • La opción packages especifica las coordenadas Maven de Spark Connector, en el formato groupId:artifactId:version.

Habilitar las funciones y los implícitos específicos del conector de MongoDB para los objetos SparkSession y Dataset importando el siguiente paquete en el shell de Spark:

import com.mongodb.spark._

La conexión a MongoDB se realiza automáticamente cuando una acción de conjunto de datos requiere una lectura desde MongoDB o un guardado en MongoDB.

Proporcione las dependencias de Spark Core, Spark SQL y MongoDB Spark Connector a su herramienta de gestión de dependencias.

El siguiente extracto demuestra cómo incluir estas dependencias en un SBT archivo build.scala:

scalaVersion := "2.13",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector_2.13" % "11.0.0",
"org.apache.spark" %% "spark-core" % "4.0.0",
"org.apache.spark" %% "spark-sql" % "4.0.0"
)

Al especificar la configuración del Connector mediante SparkSession, se debe prefijar los ajustes adecuadamente. Para obtener detalles y otras opciones disponibles de MongoDB Spark Connector, consultar la guía Configuración de Spark.

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

Si obtiene un java.net.BindException: Can't assign requested address,

  • Verificar que no tenga otro shell de Spark ya en ejecución.

  • Intentar configurar la variable de entorno SPARK_LOCAL_IP; por ejemplo,

    export SPARK_LOCAL_IP=127.0.0.1
  • Intentar incluir la siguiente opción al iniciar la shell de Spark:

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

Si aparecen errores al ejecutar los ejemplos de este tutorial, puede que se necesite borrar la caché local de Ivy (~/.ivy2/cache/org.mongodb.spark y ~/.ivy2/jars).

Las siguientes secciones describen algunas plataformas populares de terceros con las que puede integrar Spark y el Spark Connector para MongoDB.

Amazon EMR es una plataforma de clúster gestionado que se puede usar para ejecutar frameworks de big data como Spark. Para instalar Spark en un clúster EMR, consultar Introducción a Amazon EMR en la documentación de AWS.

Databricks es una plataforma de análisis para desarrollar, implementar y el uso compartido de datos para empresas. Para integrar el MongoDB Spark Connector con Databricks, consultar MongoDB en la documentación de Databricks.

Docker es una plataforma de código abierto que ayuda a los desarrolladores a compilar, compartir y ejecutar aplicaciones en contenedores.

Kubernetes es una plataforma de código abierto para la automatización de la gestión de contenedores. Para ejecutar Spark en Kubernetes, consulta Running Spark on Kubernetes en la documentación de Spark.

Volver

Overview

En esta página