Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Spark Connector

Spark Connector のスタートガイド

  • MongoDB および Apache Spark に関する基礎知識 MongoDB のドキュメント Spark のドキュメント を参照してください 、詳細についてはこの MongoDB ホワイトペーパー を参照してください。

  • MongoDB バージョン6.0以降

  • Spark バージョン3.1から3.5

  • Java 8以降

重要

Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みには、 mongodbという形式を使用します。

df = spark.read.format("mongodb").load()

Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。

バージョン3.2.0以降、 Apache Spark は Scala 2.12と2.13の両方をサポートしています。 Spark 3.1.3およびそれ以前のバージョンは、Scala 2のみをサポートしています。 12 。 両方のScalaバージョンをサポートするには、 Spark Connectorのバージョン 10.5.0 では 2 つのアーティファクトが生成されます。

  • org.mongodb.spark:mongo-spark-connector_2.12:10.5.0 は Scala 2.12に対してコンパイルされ、Spark 3.1 .x 以降をサポートしています。

  • org.mongodb.spark:mongo-spark-connector_2.13:10.5.0 は Scala 2.13に対してコンパイルされ、Spark 3.2 .x 以降をサポートしています。

重要

Scala および Spark のバージョンと互換性のある Spark Connector アーティファクトを使用します。

Maven pom.xmlファイルからの次の抽出は、Scala 2.12と互換性のある依存関係を含める方法を示しています。

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

SparkSession経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の利用可能な MongoDB Spark Connector オプションについては、 Spark 構成ガイドをご覧ください。

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • spark.mongodb.read.connection.uriでは、MongoDB サーバーのアドレス( 127.0.0.1 )、接続するデータベース( test )、データを読み取るコレクション( myCollection )と読み込み設定(read preference)を指定します。

  • spark.mongodb.write.connection.uriは、MongoDB サーバーアドレス( 127.0.0.1 )、接続するデータベース( test )、データを書き込むコレクション( myCollection )を指定します。

SparkSessionオブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、データセットの作成、SQL 操作の実行を行えます。

重要

Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodbを使用します。

df = spark.read.format("mongodb").load()

このチュートリアルではpyspark shell を使用しますが、このコードは自己完結型の Python アプリケーションでも動作します。

pyspark shell を起動するときに、以下を指定できます。

  • MongoDB Spark Connector パッケージをダウンロードするには、 --packagesオプションを使用します。 次のパッケージが利用できます。

    • mongo-spark-connector

  • MongoDB Spark Connector を構成するには、 --confオプションを使用します。 これらの設定はSparkConfオブジェクトを構成します。

    注意

    SparkConf を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。

次の例では、コマンドラインからpyspark shell を起動します。

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
  • spark.mongodb.read.connection.uriでは、MongoDB サーバーのアドレス( 127.0.0.1 )、接続するデータベース( test )、データを読み取るコレクション( myCollection )と読み込み設定(read preference)を指定します。

  • spark.mongodb.write.connection.uriは、MongoDB サーバーアドレス( 127.0.0.1 )、接続するデータベース( test )、データを書き込むコレクション( myCollection )を指定します。 デフォルトでポート27017に接続します。

  • packagesオプションは、Spark Connector の Maven 座標をgroupId:artifactId:version形式で指定します。

このチュートリアルの例では、このデータベースとコレクションを使用します。

注意

pysparkを起動すると、デフォルトでsparkと呼ばれるSparkSessionオブジェクトが生成されます。 スタンドアロンの Python アプリケーションでは、以下に示すようにSparkSessionオブジェクトを明示的に作成する必要があります。

pysparkの起動時にspark.mongodb.read.connection.urispark.mongodb.write.connection.uri構成オプションを指定した場合、デフォルトのSparkSessionオブジェクトはそれらを使用します。 pyspark内から独自のSparkSessionオブジェクトを作成する場合は、 SparkSession.builderを使用して別の構成オプションを指定できます。

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

SparkSessionオブジェクトを使用して、MongoDB へのデータの書き込み、MongoDB からのデータの読み取り、DataFrames の作成、SQL 操作の実行を行えます。

重要

Connector のバージョン10.0.0以降では、MongoDB からの読み取りと MongoDB への書込みに形式mongodbを使用します。

df = spark.read.format("mongodb").load()

Spark shell を起動するときに、以下を指定します。

  • MongoDB Spark Connector パッケージをダウンロードするには、 --packagesオプションを使用します。 次のパッケージが利用できます。

    • mongo-spark-connector

  • MongoDB Spark Connector を構成するには、 --confオプションを使用します。 これらの設定はSparkConfオブジェクトを構成します。

    注意

    SparkConf を使用してSpark Connectorを構成する場合は、 の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。

たとえば、

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0
  • spark.mongodb.read.connection.uriでは、MongoDB サーバーのアドレス( 127.0.0.1 )、接続するデータベース( test )、データを読み取るコレクション( myCollection )と読み込み設定(read preference)を指定します。

  • spark.mongodb.write.connection.uriは、MongoDB サーバーアドレス( 127.0.0.1 )、接続するデータベース( test )、データを書き込むコレクション( myCollection )を指定します。 デフォルトでポート27017に接続します。

  • packagesオプションは、Spark Connector の Maven 座標をgroupId:artifactId:version形式で指定します。

Spark shell に次のパッケージをインポートして、 オブジェクトと オブジェクトで MongoDB Connector 固有の関数と暗黙的な機能を有効にします。SparkSessionDataset

import com.mongodb.spark._

MongoDB への接続は、Dataset アクションで MongoDB からの読み取りまたは MongoDB への書き込みが必要な場合に、自動的に行われます。

Spark Core、Spark SQL、MongoDB Spark Connector の依存関係を依存関係管理ツールに指定します。

次の図は、これらの依存関係を SFTbuild.scala に含める方法を示しています。 ファイル:

scalaVersion := "2.12",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.5.0",
"org.apache.spark" %% "spark-core" % "3.3.1",
"org.apache.spark" %% "spark-sql" % "3.3.1"
)

SparkSession経由で Connector 構成を指定する場合、 設定の前に適切に設定する必要があります。 詳細とその他の使用可能な MongoDB Spark Connector オプションについては、「 Spark の構成」ガイドを参照してください。

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

java.net.BindException: Can't assign requested addressを取得した場合、

  • 別の Spark shell がすでに実行されていないことを確認してください。

  • SPARK_LOCAL_IP環境変数を設定してみてください。例:

    export SPARK_LOCAL_IP=127.0.0.1
  • Spark shell を起動するときに、次のオプションを含めてみてください。

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

このチュートリアルの例の実行中にエラーが発生した場合は、ローカル Ividy キャッシュ( ~/.ivy2/cache/org.mongodb.spark~/.ivy2/jars )をクリアする必要がある場合があります。

次のセクションでは、 SparkとMongoDB Spark コネクタを統合できる一般的なサードパーティのプラットフォームについて説明します。

Amazon EDR は、 Sparkのような大規模なデータフレームワークを実行するために使用できるマネージドクラスタープラットフォームです。EDR クラスターにSparkをインストールするには、 Amazon Web Services のドキュメントの「 Amazon EDR を使い始める 」を参照してください。

Databricks は、エンタープライズ レベルのデータをビルド、配置、共有するための分析プラットフォームです。MongoDB Spark コネクタ をDatabricks と統合するには、Databricks ドキュメントの「 MongoDB 」を参照してください。

Docker は、開発者がコンテナ内でアプリケーションを構築、共有、実行するのに役立つオープンソース プラットフォームです。

Kubernetes は、コンテナ化マネジメントを自動化するためのオープンソース プラットフォームです。KubernetesでSparkを実行するには、 Sparkドキュメントの「 KubernetesでのSparkの実行 」を参照してください。

戻る

Overview

項目一覧