Pré-requisitos
Conhecimento básico de trabalho do MongoDB e Apache Spark. Consulte a documentação do MongoDB , a documentação do Spark e este whitepaper do MongoDB para obter mais detalhes.
MongoDB versão 6.0 ou posterior
Versão do Spark de 3.1 a 3.2.4
Java 8 ou posterior
Começar
Importante
Na versão 10.0.0 e posterior do Connector, use o formato mongodb para ler e escrever no MongoDB:
df = spark.read.format("mongodb").load()
Gestão de dependência
Forneça as dependências Spark Core, Spark SQL e MongoDB Spark Connector à sua ferramenta de gerenciamento de dependência.
A partir da versão 3.2.0, O Apache Spark é compatível com Scala 2.12 e 2.13. O Spark 3.1.3 e as versões anteriores são compatíveis apenas com Scala 2.12. Para fornecer suporte para ambas as versões Scala , a versão 10.2.2 do Spark Connector produz dois artefatos:
org.mongodb.spark:mongo-spark-connector_2.12:10.2.2é compilado em relação ao Scala 2.12 e é compatível com o Spark 3.1.x e superior.org.mongodb.spark:mongo-spark-connector_2.13:10.2.2é compilado em relação ao Scala 2.13 e é compatível com o Spark 3.2.x e superior.
Importante
Use o artefato Spark Connector compatível com suas versões do Scala e Spark.
O seguinte trecho de um arquivo Maven pom.xml mostra como incluir dependências compatíveis com Scala 2.12:
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
Configuração
Ao especificar a configuração do Connector via SparkSession, você deve prefixar as configurações adequadamente. MongoDB Spark Connector Para obter detalhes e outras opções disponíveis do Spark , consulte o guia Configuração do .
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
O
spark.mongodb.read.connection.uriespecifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados de dados para conectar (test) e a collection (myCollection) da qual ler os dados e a preferência de leitura.O
spark.mongodb.write.connection.uriespecifica o endereço do servidor MongoDB(127.0.0.1), o banco de dados para conectar (test) e a collection (myCollection) na qual escrever dados.
Você pode utilizar um objeto SparkSession para escrever dados no MongoDB, ler dados do MongoDB, criar conjuntos de dados e executar operações SQL.
Importante
Na versão 10.0.0 e posterior do conector, use o formato mongodb para ler e escrever no MongoDB:
df = spark.read.format("mongodb").load()
Python Spark Shell
Este tutorial usa o shell pyspark, mas o código também funciona com aplicativos Python independentes.
Ao iniciar o shell pyspark, você pode especificar:
a opção
--packagespara baixar o pacote do conector Spark do MongoDB. O seguinte pacote está disponível:mongo-spark-connector
a opção
--confpara configurar o conector Spark do MongoDB. Estas configurações definem o objetoSparkConf.Observação
Se você usar
SparkConfpara configurar o Spark Connector, deverá prefixar as configurações adequadamente. Para obter detalhes e outras opções MongoDB Spark Connector disponíveis do , consulte o guia Configurar .Spark
O exemplo a seguir inicia o shell pyspark na linha de comando:
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.2
O
spark.mongodb.read.connection.uriespecifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados para conectar (test) e a collection (myCollection) da qual ler os dados e a read preference.O
spark.mongodb.write.connection.uriespecifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados de dados para conectar (test) e a collection (myCollection) na qual escrever dados. Conecta à porta27017por padrão.A opção
packagesespecifica as coordenadas Maven do conector Spark, no formatogroupId:artifactId:version.
Os exemplos neste tutorial usarão esse banco de dados e coleção.
Criar um SparkSession objeto
Observação
Ao iniciar pyspark você obtém um objeto SparkSession chamado spark por padrão. Em um aplicativo Python autônomo, você precisa criar seu objeto SparkSession explicitamente, conforme mostrado abaixo.
Se você especificou as opções de configuração spark.mongodb.read.connection.uri e spark.mongodb.write.connection.uri ao iniciar pyspark, o objeto SparkSession padrão as utilizará. Se preferir criar seu próprio objeto SparkSession de dentro do pyspark, você pode usar SparkSession.builder e especificar diferentes opções de configuração.
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
Você pode utilizar um objeto SparkSession para gravar dados no MongoDB, ler dados do MongoDB, criar DataFrames e executar operações SQL.
Importante
Na versão 10.0.0 e posterior do conector, use o formato mongodb para ler e escrever no MongoDB:
df = spark.read.format("mongodb").load()
Shell Spark
Ao iniciar a concha Spark, especifique:
a opção
--packagespara baixar o pacote do conector Spark do MongoDB. O seguinte pacote está disponível:mongo-spark-connector
a opção
--confpara configurar o conector Spark do MongoDB. Estas configurações definem o objetoSparkConf.Observação
Se você usar
SparkConfpara configurar o Spark Connector, deverá prefixar as configurações adequadamente. Para obter detalhes e outras opções MongoDB Spark Connector disponíveis do , consulte o guia Configurar .Spark
Por exemplo,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.2
O
spark.mongodb.read.connection.uriespecifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados para conectar (test) e a collection (myCollection) da qual ler os dados e a read preference.O
spark.mongodb.write.connection.uriespecifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados de dados para conectar (test) e a collection (myCollection) na qual escrever dados. Conecta à porta27017por padrão.A opção
packagesespecifica as coordenadas Maven do conector Spark, no formatogroupId:artifactId:version.
Importar o pacote do conector MongoDB
Habilite as funções e implicações específicas do MongoDB Connector para seus objetos SparkSession e Dataset importando o seguinte pacote no shell do Spark:
import com.mongodb.spark._
Conecte-se ao MongoDB
A conexão com o MongoDB acontece automaticamente quando uma ação do Dataset requer uma leitura do MongoDB ou uma gravação no MongoDB.
Aplicativo Scala autônomo
Gestão de dependência
Forneça as dependências Spark Core, Spark SQL e MongoDB Spark Connector à sua ferramenta de gerenciamento de dependência.
O seguinte trecho demonstra como incluir essas dependências em um arquivo SBT build.scala:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.2.2", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
Configuração
Ao especificar a configuração do conector via SparkSession, você deve prefixar as configurações adequadamente. Para informações e outras opções disponíveis do Conector MongoDB Spark, consulte o guia Configurando o Spark.
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
Solução de problemas
Se você receber um java.net.BindException: Can't assign requested address,
Verifique se você não tem outro shell do Spark já em execução.
Tente definir a variável de ambiente
SPARK_LOCAL_IP; por exemploexport SPARK_LOCAL_IP=127.0.0.1 Tente incluir a seguinte opção ao iniciar a concha Spark:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
Se você tiver erros ao executar os exemplos neste tutorial, você poderá precisar limpar seu cache ívy local (~/.ivy2/cache/org.mongodb.spark e ~/.ivy2/jars).
Integrations
As seções a seguir descrevem algumas plataformas populares de terceiros com as quais você pode integrar o Spark e o conector Spark MongoDB .
Amazon ESR
O Amazon ESR é uma plataforma de cluster gerenciado que você pode usar para executar estruturas de big data como o Spark. Para instalar o Spark em um cluster ESR, consulte Introdução ao Amazon ESR na documentação do Amazon Web Services.
Databhards
Databricks é uma plataforma de análise para criar, implantar e compartilhar dados de nível empresarial. Para integrar o conector Spark MongoDB com Databricks, consulte MongoDB na documentação Databricks.
Docker
O Docker é uma plataforma de código aberto que ajuda os desenvolvedores a criar, compartilhar e executar aplicativos em contêineres.
Para iniciar o Spark em um container do Docker, consulte Apache Spark na documentação do Docker e siga as etapas fornecidas.
Para saber como implantar o Atlas no Docker, consulte Criar um sistema local do Atlas com o Docker.
Kubernetes
O Kubernetes é uma plataforma open source para automatizar o gerenciamento de containerização. Para executar o Spark no Kubernetes, consulte Executando o Spark no Kubernetes na documentação do Spark.