Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
MongoDB Spark Connector
/ /

ストリーミング読み取り構成オプション

ストリーミング モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。

注意

SparkConfを使用してコネクタの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.を付けます。

プロパティ名
説明

connection.uri

Required.
The connection string configuration key.

Default: mongodb://localhost:27017/

database

Required.
The database name configuration.

collection

Required.
The collection name configuration.

comment

The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None

mongoClientFactory

MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 $groupなどの集計ステージは、複数のパーティションを作成するパーティショニングでは機能しません。

aggregation.allowDiskUse

Specifies whether to allow storage to disk when running the aggregation.

Default: true

change.stream.

Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.

outputExtendedJson

When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

MongoDB から変更ストリームを読み取るときに、次のプロパティを構成できます。

プロパティ名
説明

change.stream.lookup.full.document

アップデート操作時に変更ストリームが返す値を決定します。

デフォルト設定では、元のドキュメントと更新されたドキュメントの差が返されます。

updateLookup設定は元のドキュメントと更新されたドキュメントの差も返しますが、更新されたドキュメント全体のコピーも含まれます。

この変更ストリーム オプションの機能の詳細については、MongoDB サーバー マニュアル ガイド「 更新操作のための完全なドキュメントの検索 」を参照してください。

デフォルト: "default"

change.stream.micro.batch.max.partition.count

The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

警告: 1より大きい値を指定すると、Spark Connector が変更イベントを処理する順序が変更される可能性があります。 順序以外の処理によって下流でデータの不整合が発生する可能性がある場合は、この設定を避けます。

change.stream.publish.full.document.only

Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • connectorは、 fullDocumentフィールドが省略されているメッセージをフィルタリングで除外し、 フィールドの値のみを公開します。

  • スキーマを指定しない場合、コネクタは基礎となるコレクションではなく 、変更ストリームドキュメントからスキーマを推論します。

この設定はchange.stream.lookup.full.document設定を上書きします。

デフォルトfalse

change.stream.startup.mode

Specifies how the connector starts up when no offset is available.
This setting accepts the following values:
  • latest: コネクタは、最新のイベントから変更イベントの処理を開始します。 以前に処理されていないイベントは処理されません。

  • timestamp: コネクタは指定された時刻に変更イベントの処理を開始します。

    timestampオプションを使用するには、 change.stream.startup.mode.timestamp.start.at.operation.time設定を使用して時間を指定する必要があります。 この設定では、次の形式のタイムスタンプが受け入れられます。

    デフォルトlatest

SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri設定に含めるか、個別に一覧表示できます。

次のコード例は、 connection.uri設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

connection.uriを短くして設定を読みやすくするには、代わりにこれらを個別に指定します。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

connection.uriとその行の両方に 設定を指定すると、 connection.uriの設定が優先されます。 たとえば、次の構成では、接続データベースはfoobarです。これはconnection.uri設定の 値であるためです。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

戻る

読み取り

項目一覧