前提条件
MongoDB および Apache Spark に関する基礎知識 MongoDB のドキュメント 、 Spark のドキュメント を参照してください 、詳細についてはこの MongoDB ホワイトペーパー を参照してください。
MongoDB バージョン6.0以降
Spark バージョン3.1から3.5
Java 8以降
スタートガイド
重要
Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みには、 mongodb
という形式を使用します。
df = spark.read.format("mongodb").load()
依存関係マネジメント
Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。
バージョン3.2.0以降、 Apache Spark は Scala 2.12と2.13の両方をサポートしています。 Spark 3.1.3およびそれ以前のバージョンは、Scala 2のみをサポートしています。 12 。 両方のScalaバージョンをサポートするには、 Spark Connectorのバージョン 10.5.0 では 2 つのアーティファクトが生成されます。
org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
は Scala 2.12に対してコンパイルされ、Spark 3.1 .x 以降をサポートしています。org.mongodb.spark:mongo-spark-connector_2.13:10.5.0
は Scala 2.13に対してコンパイルされ、Spark 3.2 .x 以降をサポートしています。
重要
Scala および Spark のバージョンと互換性のある Spark Connector アーティファクトを使用します。
Maven pom.xml
ファイルからの次の抽出は、Scala 2.12と互換性のある依存関係を含める方法を示しています。
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
構成
SparkSession
経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の利用可能な MongoDB Spark Connector オプションについては、 のSpark 構成ガイドをご覧ください。
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
spark.mongodb.read.connection.uri
では、MongoDB サーバーのアドレス(127.0.0.1
)、接続するデータベース(test
)、データを読み取るコレクション(myCollection
)と読み込み設定(read preference)を指定します。spark.mongodb.write.connection.uri
は、MongoDB サーバーアドレス(127.0.0.1
)、接続するデータベース(test
)、データを書き込むコレクション(myCollection
)を指定します。
SparkSession
オブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、データセットの作成、SQL 操作の実行を行えます。
重要
Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodb
を使用します。
df = spark.read.format("mongodb").load()
Python Spark shell
このチュートリアルではpyspark
shell を使用しますが、このコードは自己完結型の Python アプリケーションでも動作します。
pyspark
shell を起動するときに、以下を指定できます。
MongoDB Spark Connector パッケージをダウンロードするには、
--packages
オプションを使用します。 次のパッケージが利用できます。mongo-spark-connector
MongoDB Spark Connector を構成するには、
--conf
オプションを使用します。 これらの設定はSparkConf
オブジェクトを構成します。注意
SparkConf
を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。
次の例では、コマンドラインからpyspark
shell を起動します。
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
spark.mongodb.read.connection.uri
では、MongoDB サーバーのアドレス(127.0.0.1
)、接続するデータベース(test
)、データを読み取るコレクション(myCollection
)と読み込み設定(read preference)を指定します。spark.mongodb.write.connection.uri
は、MongoDB サーバーアドレス(127.0.0.1
)、接続するデータベース(test
)、データを書き込むコレクション(myCollection
)を指定します。 デフォルトでポート27017
に接続します。packages
オプションは、Spark Connector の Maven 座標をgroupId:artifactId:version
形式で指定します。
このチュートリアルの例では、このデータベースとコレクションを使用します。
SparkSession
オブジェクトの作成
注意
pyspark
を起動すると、デフォルトでspark
と呼ばれるSparkSession
オブジェクトが生成されます。 スタンドアロンの Python アプリケーションでは、以下に示すようにSparkSession
オブジェクトを明示的に作成する必要があります。
pyspark
の起動時にspark.mongodb.read.connection.uri
とspark.mongodb.write.connection.uri
構成オプションを指定した場合、デフォルトのSparkSession
オブジェクトはそれらを使用します。 pyspark
内から独自のSparkSession
オブジェクトを作成する場合は、 SparkSession.builder
を使用して別の構成オプションを指定できます。
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
SparkSession
オブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、DataFrames の作成、SQL 操作の実行を行えます。
重要
Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodb
を使用します。
df = spark.read.format("mongodb").load()
Spark shell
Spark shell を起動するときに、以下を指定します。
MongoDB Spark Connector パッケージをダウンロードするには、
--packages
オプションを使用します。 次のパッケージが利用できます。mongo-spark-connector
MongoDB Spark Connector を構成するには、
--conf
オプションを使用します。 これらの設定はSparkConf
オブジェクトを構成します。注意
SparkConf
を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。
たとえば、
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
spark.mongodb.read.connection.uri
では、MongoDB サーバーのアドレス(127.0.0.1
)、接続するデータベース(test
)、データを読み取るコレクション(myCollection
)と読み込み設定(read preference)を指定します。spark.mongodb.write.connection.uri
は、MongoDB サーバーアドレス(127.0.0.1
)、接続するデータベース(test
)、データを書き込むコレクション(myCollection
)を指定します。 デフォルトでポート27017
に接続します。packages
オプションは、Spark Connector の Maven 座標をgroupId:artifactId:version
形式で指定します。
MongoDB Connector パッケージのインポート
Spark shell に次のパッケージをインポートして、 オブジェクトと オブジェクトで MongoDB Connector 固有の関数と暗黙的な機能を有効にします。SparkSession
Dataset
import com.mongodb.spark._
MongoDB に接続する
MongoDB への接続は、Dataset アクションで MongoDB からの読み取りまたは MongoDB への書き込みが必要な場合に、自動的に行われます。
自己完結型の Scala アプリケーション
依存関係マネジメント
Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。
次の図は、これらの依存関係を SFTbuild.scala
に含める方法を示しています。 ファイル:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.5.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
構成
SparkSession
経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
トラブルシューティング
java.net.BindException: Can't assign requested address
を取得した場合、
別の Spark shell がすでに実行されていないことを確認してください。
SPARK_LOCAL_IP
環境変数を設定してみてください。例:export SPARK_LOCAL_IP=127.0.0.1 Spark shell を起動するときに、次のオプションを含めてみてください。
--driver-java-options "-Djava.net.preferIPv4Stack=true"
このチュートリアルの例の実行中にエラーが発生した場合は、ローカル Ividy キャッシュ( ~/.ivy2/cache/org.mongodb.spark
と~/.ivy2/jars
)をクリアする必要がある場合があります。
Integrations
次のセクションでは、 SparkとMongoDB Spark コネクタを統合できる一般的なサードパーティのプラットフォームについて説明します。
Amazon
Amazon EDR は、 Sparkのような大規模なデータフレームワークを実行するために使用できるマネージドクラスタープラットフォームです。EDR クラスターにSparkをインストールするには、 Amazon Web Services のドキュメントの「 Amazon EDR を使い始める 」を参照してください。
Databricks
Databricks は、エンタープライズ レベルのデータをビルド、配置、共有するための分析プラットフォームです。MongoDB Spark コネクタ をDatabricks と統合するには、Databricks ドキュメントの「 MongoDB 」を参照してください。
Docker
Docker は、開発者がコンテナ内でアプリケーションを構築、共有、実行するのに役立つオープンソース プラットフォームです。
DockerコンテナでSparkを起動するには、 DockerドキュメントのApache Sparkを参照し、提供された手順に従います。
Dockerに Atlas を配置する方法については、 「 Dockerを使用してローカル Atlas 配置の作成 」を参照してください。
Kubernetes
Kubernetes は、コンテナ化マネジメントを自動化するためのオープンソース プラットフォームです。KubernetesでSparkを実行するには、 Sparkドキュメントの「 KubernetesでのSparkの実行 」を参照してください。