开始使用 Spark Connector
先决条件
具备 MongoDB 和 Apache Spark 的基本使用知识。 请参阅 MongoDB 文档 、 Spark 文档 ,并参阅此 MongoDB 白皮书 以了解更多详细信息。
MongoDB 6.0 或更高版本
Spark 版本 3.1 至 3.5
Java 8 或更高版本
开始体验
重要
在 Connector 版本10.0.0及更高版本中,使用 mongodb
格式读取和写入MongoDB:
df = spark.read.format("mongodb").load()
依赖项管理
向依赖项管理工具提供 Spark Core、Spark SQL 和 MongoDB Spark Connector 依赖项。
从版本3.2.0开始, Apache Spark 同时支持 Scala 2.12和2.13 。 Spark 3.1.3和以前的版本仅支持 Scala 2 。 12 。 要为两个 Scala 版本提供支持,请使用10版本。 5 。 Spark Connector 的0会生成两个工件:
org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
针对 Scala 2.12进行编译,并支持 Spark 3.1 .x 及更高版本。org.mongodb.spark:mongo-spark-connector_2.13:10.5.0
针对 Scala 2.13进行编译,并支持 Spark 3.2 .x 及更高版本。
重要
使用与您的 Scala 和 Spark 版本兼容的 Spark Connector 工件。
以下 Maven pom.xml
文件摘录显示了如何包含与 Scala 2.12兼容的依赖项:
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
配置
通过 SparkSession
指定Connector配置时,必须为设置添加适当的前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《 配置Spark 》指南。
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
spark.mongodb.read.connection.uri
指定MongoDB服务器解决(127.0.0.1
)、要连接的数据库(test
)、要从中读取数据的集合(myCollection
) 以及读取偏好(read preference)。spark.mongodb.write.connection.uri
指定 MongoDB 服务器地址 (127.0.0.1
)、要连接的数据库 (test
) 以及要写入数据的集合 (myCollection
)。
您可以使用SparkSession
对象将数据写入 MongoDB、从 MongoDB 读取数据、创建数据集以及执行 SQL 操作。
重要
在版本 10.0.0 及更高版本的 Connector 中,请使用 mongodb
格式读取和写入 MongoDB:
df = spark.read.format("mongodb").load()
Python Spark Shell
本教程使用 pyspark
shell,但该代码也适用于独立的 Python 应用程序。
在启动 pyspark
Shell 时,您可以指定:
--packages
选项以下载 MongoDB Spark Connector 包。提供了以下包:mongo-spark-connector
--conf
选项以配置 MongoDB Spark Connnector。这些设置可配置SparkConf
对象。注意
如果使用
SparkConf
配置Spark Connector,则必须为设置相应地添加前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《配置Spark 》指南。
以下示例从命令行启动 pyspark
Shell:
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
spark.mongodb.read.connection.uri
指定 MongoDB 服务器地址 (127.0.0.1
)、要连接的数据库 (test
)、要从中读取数据的集合 (myCollection
) 以及读取偏好。spark.mongodb.write.connection.uri
指定MongoDB服务器解决(127.0.0.1
)、要连接的数据库(test
) 以及要写入数据的集合(myCollection
)。 默认连接到端口27017
。packages
选项以groupId:artifactId:version
格式指定 Spark Connector 的 Maven 坐标。
本教程中的示例将使用此数据库和集合。
创建SparkSession
对象
注意
在启动 pyspark
时,您默认获得一个名为 spark
的 SparkSession
spark
对象。在独立的 Python 应用程序中,您需要显式地创建 SparkSession
对象,如下所示。
如果您在启动 pyspark
时指定 spark.mongodb.read.connection.uri
和 spark.mongodb.write.connection.uri
配置选项,则默认 SparkSession
对象将使用它们。如果您希望在 pyspark
中创建自己的 SparkSession
对象,则可以使用 SparkSession.builder
并指定不同的配置选项。
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
您可以使用 SparkSession
对象在 MongoDB 中写入和读取数据、创建 DataFrame 以及执行 SQL 操作。
重要
在版本 10.0.0 及更高版本的 Connector 中,请使用 mongodb
格式读取和写入 MongoDB:
df = spark.read.format("mongodb").load()
Spark Shell
启动 Spark Shell 时,请指定:
--packages
选项以下载 MongoDB Spark Connector 包。提供了以下包:mongo-spark-connector
--conf
选项以配置 MongoDB Spark Connnector。这些设置可配置SparkConf
对象。注意
如果使用
SparkConf
配置Spark Connector,则必须为设置相应地添加前缀。 有关详细信息和其他可用的MongoDB Spark Connector选项,请参阅《配置Spark 》指南。
例如,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
spark.mongodb.read.connection.uri
指定 MongoDB 服务器地址 (127.0.0.1
)、要连接的数据库 (test
)、要从中读取数据的集合 (myCollection
) 以及读取偏好。spark.mongodb.write.connection.uri
指定MongoDB服务器解决(127.0.0.1
)、要连接的数据库(test
) 以及要写入数据的集合(myCollection
)。 默认连接到端口27017
。packages
选项以groupId:artifactId:version
格式指定 Spark Connector 的 Maven 坐标。
导入 MongoDB Connector 程序包
通过在 中导入以下包,为 和 对象启用 Connector 特定的函数和隐式函数:MongoDBSparkSession
Dataset
Sparkshell
import com.mongodb.spark._
连接至 MongoDB
当数据集操作需要读取 MongoDB 或写入 MongoDB 时,会自动连接到 MongoDB。
自包含 Scala 应用程序
依赖项管理
向依赖项管理工具提供 Spark Core、Spark SQL 和 MongoDB Spark Connector 依赖项。
以下摘录演示了如何将这些依赖项包含在 SBTbuild.scala
文件:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.5.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
配置
在通过 SparkSession
指定 Connector 配置时,您必须相应地为设置添加前缀。有关详细信息和其他可用的 MongoDB Spark Connector 选项,请参阅配置 Spark 指南。
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
故障排除
如果得到 java.net.BindException: Can't assign requested address
,
检查并确保没有其他 Spark Shell 正在运行。
尝试设置
SPARK_LOCAL_IP
环境变量;例如export SPARK_LOCAL_IP=127.0.0.1 尝试在启动 Spark Shell 时加入以下选项:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
如果运行本教程中的示例时出现错误,可能需要清除本地 Ivy 缓存(~/.ivy2/cache/org.mongodb.spark
和 ~/.ivy2/jars
)。
Integrations
以下部分介绍了一些可以集成Spark和MongoDB Spark Connector的常用第三方平台。
Amazon EMR
Amazon EMR 是一个托管集群平台,可用于运行Spark等大数据框架。要在 EMR集群上安装Spark ,请参阅Amazon Web Services文档中的Amazon EMR入门。
Databricks
Databricks 是一个用于构建、部署和共享企业级数据的分析平台。要将MongoDB Spark Connector与 Databricks 集成,请参阅 Databricks 文档中的MongoDB。
Docker
Docker是一个开源平台,可帮助开发者在容器中构建、股票和运行应用程序。
要在Docker容器中启动Spark ,请参阅Docker文档中的Apache Spark ,并按照提供的步骤进行操作。
要学习;了解如何在Docker上部署Atlas ,请参阅使用Docker创建本地Atlas部署。
Kubernetes
Kubernetes是一个用于自动化容器化管理的开源平台。要在Kubernetes上运行Spark ,请参阅Spark文档中的在Kubernetes上运行Spark 。