AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

ストリーミング読み取り構成オプション

ストリーミング モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。

注意

SparkConfを使用してコネクターの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.を付けます。

プロパティ名
説明

connection.uri

必須。


接続文字列の構成キー。デフォルト:mongodb://localhost:27017/

database

必須。
データベース名の構成。

collection

必須。
コレクション名の構成。コレクション名をカンマで区切ることで、複数のコレクションを指定できます。複数のコレクションを指定する方法の詳細については、「


プロパティでの複数のコレクションの指定 collection」を参照してください。

comment

読み取り操作に追加するコメント。コメントは データベースプロファイラー

の出力に表示されます。デフォルト: なし

mode

期待されるスキーマに一致しないドキュメントを処理する場合に使用する解析戦略。 このオプションは、次の値を受け入れます。

  • ReadConfig.ParseMode.FAILFAST: スキーマに一致しないドキュメントを解析するときに例外をスローします。

  • ReadConfig.ParseMode.PERMISSIVE: データ型がスキーマと一致しない場合、フィールドをnullに設定します。 無効な各ドキュメントを拡張 JSON string として保存するには、この値をcolumnNameOfCorruptRecordオプションと組み合わせます。

  • ReadConfig.ParseMode.DROPMALFORMED: スキーマに一致しないドキュメントを無視します。


デフォルト: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

modeReadConfig.ParseMode.PERMISSIVEオプションを に設定すると、このオプションは無効なドキュメントを 拡張JSONとして保存する新しい列の名前を指定します。明示的なスキーマを使用している場合は、新しい列の名前を含める必要があります。推論されたスキーマを使用している場合、 Spark コネクタ はスキーマの末尾に新しい列を追加します。デフォルト:

なし

mongoClientFactory

MongoClientFactory 構成キー。カスタム実装を指定できます。これは
com.mongodb.spark.sql.connector.connection.MongoClientFactory

インターフェースを実装する必要があります。デフォルト:com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Sparkにデータを送信する前にコレクションに適用するカスタム集計パイプラインを指定します。値は、拡張JSON単一ドキュメントまたはドキュメントのリストである必要があります。単一のドキュメントは次のようになります。

{"$match": {"closed": false}}

ドキュメントのリストは次のようになります。

[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 $groupなどの集計ステージは、複数のパーティションを作成するパーティショニングでは機能しません。

aggregation.allowDiskUse

集計を実行中ときにディスクへのストレージを許可するかどうかを指定します。デフォルト:

true

change.stream.

ストリーム構成のプレフィックスを変更します。変更ストリームの詳細については、「
変更ストリーム構成 」セクションを参照してください。

outputExtendedJson

trueの場合、コネクタはSparkでサポートされていないBSON型を拡張JSON文字列に変換します。false の場合、コネクタはサポートされていない型に対して元の緩和型JSON形式を使用します。デフォルト:

false

schemaHints

コレクションのスキーマを推論するときに使用する既知のフィールドタイプの部分的なスキーマを指定します。schemaHints オプションの詳細については、「 スキーマ ヒントで既知のフィールドを指定する 」セクションを参照してください。デフォルト:

なし

MongoDB から変更ストリームを読み取るときに、次のプロパティを構成できます。

プロパティ名
説明

change.stream.lookup.full.document

アップデート操作時に変更ストリームが返す値を決定します。

デフォルト設定では、元のドキュメントと更新されたドキュメントの差が返されます。

updateLookup設定は元のドキュメントと更新されたドキュメントの差も返しますが、更新されたドキュメント全体のコピーも含まれます。

この変更ストリーム オプションの機能の詳細については、MongoDB サーバー マニュアル ガイド「 更新操作のための完全なドキュメントの検索 」を参照してください。

デフォルト: "default"

change.stream.micro.batch.max.partition.count

Spark コネクタが各マイクロ バッチを に分割するパーティションの最大数。 Sparkワーカーはこれらのパーティションを並列に処理できます。この設定は、マイクロバッチ ストリームを使用する場合にのみ適用されます。デフォルト:



1

警告: 1より大きい値を指定すると、Spark Connector が変更イベントを処理する順序が変更される可能性があります。 順序以外の処理によって下流でデータの不整合が発生する可能性がある場合は、この設定を避けます。

change.stream.publish.full.document.only

変更されたドキュメントを公開するか、完全な変更ストリームドキュメントを公開するかを指定します。この設定が

falseの場合、スキーマを指定する必要があります。スキーマには、変更ストリームから読み取るすべてのフィールドを含める必要があります。オプション フィールドを使用すると、スキーマがすべての変更ストリーム イベントで有効であることを確認できます。この設定が

trueの場合、コネクタは次の動作を示します。

  • connectorは、 fullDocumentフィールドが省略されているメッセージをフィルタリングで除外し、 フィールドの値のみを公開します。

  • スキーマを指定しない場合、connector は 変更ストリーム ドキュメント からスキーマを推論します。

この設定はchange.stream.lookup.full.document設定を上書きします。

デフォルトfalse

change.stream.startup.mode

オフセットが使用できない場合にコネクタが起動する方法を指定します。

この設定では、次の値を受け入れます。

  • latest: コネクタは、最新のイベントから変更イベントの処理を開始します。 以前に処理されていないイベントは処理されません。

  • timestamp: コネクタは指定された時刻に変更イベントの処理を開始します。

    timestampオプションを使用するには、 change.stream.startup.mode.timestamp.start.at.operation.time設定を使用して時間を指定する必要があります。 この設定では、次の形式のタイムスタンプが受け入れられます。

    • UNIXエポック からの秒数を表す整数

    • ISO-8601形式の日付と時刻(1 秒の精度)

    • 拡張 JSON BsonTimestamp

    デフォルトlatest

change.stream.lookup.full.document.before.change

変更されたドキュメントの変更前のイメージを変更ストリーム出力

に含めるかどうかを決定します。この設定では、次の値を受け入れます。

  • default: このオプションは off 値と同じです。

  • off:変更ストリーム出力に変更されたドキュメントの変更前のイメージを含めません。

  • whenAvailable: 変更前のイメージが利用可能な場合、変更ストリーム出力に変更されたドキュメントの変更前のイメージを含めます。

  • required: 変更されたドキュメントの変更前のイメージを変更ストリーム出力 に含めます。変更されたドキュメントで変更前のイメージが利用できない場合、コネクタはエラーをスローします。

デフォルト: default

SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri設定に含めるか、個別に一覧表示できます。

次のコード例は、 connection.uri設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

connection.uriを短くして設定を読みやすくするには、代わりにこれらを個別に指定します。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection

重要

connection.uriとその行の両方に 設定を指定すると、 connection.uriの設定が優先されます。 たとえば、次の構成では、接続データベースはfoobarです。これはconnection.uri設定の 値であるためです。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

コレクション名をカンマで区切って、 collection変更ストリーム構成プロパティで複数のコレクションを指定できます。 スペースがコレクション名の一部でない限り、コレクション間にスペースを入れないでください。

次の例に示すように、複数のコレクションを指定します。

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

コレクション名が「*」の場合、または名前にコンマまたはバックスラッシュ(\)が含まれている場合は、次のように文字をエスケープする必要があります。

  • collection 構成オプションで使用されるコレクションの名前にカンマが含まれている場合、 Spark Connectorはそれを 2 つの異なるコレクションとして扱います。 これを回避するには、カンマの前にバックスラッシュ(\)を付けてコンマをエスケープする必要があります。 「my,collection」という名前のコレクションを次のようにエスケープします。

    "my\,collection"
  • collection 構成オプションで使用されるコレクションの名前が「*」の場合、 Spark Connectorはそれをすべてのコレクションをスキャンするための仕様と解釈します。 これを回避するには、アスタリスクの前にバックスラッシュ(\)を付けてアスタリスクをエスケープする必要があります。 「*」という名前のコレクションを次のようにエスケープします。

    "\*"
  • collection 構成オプションで使用されるコレクションの名前にバックスラッシュ(\)が含まれている場合、 Spark Connectorはバックスラッシュをエスケープ文字として扱い、 値の解釈方法が変更される可能性があります。 これを回避するには、バックスラッシュの前に別のバックスラッシュを付けて、バックスラッシュをエスケープする必要があります。 "\collection" という名前のコレクションを次のようにエスケープします。

    "\\collection"

    注意

    Java でコレクション名を string リテラルとして指定する場合は、それぞれのバックスラッシュを別のバックスラッシュでさらにエスケープする必要があります。 たとえば、"\collection" という名前のコレクションは、次のようにエスケープします。

    "\\\\collection"

コレクション名のstringとしてアスタリスク(*)を渡すことで、 データベース内のすべてのコレクションからストリーミングできます。

次の例に示すように、すべてのコレクションを指定します。

...
.option("spark.mongodb.collection", "*")

すべてのコレクションからストリーミング中にコレクションを作成すると、新しいコレクションは自動的にストリームに含まれます。

複数のコレクションからストリーミングしているときに、いつでもコレクションを削除できます。

重要

複数のコレクションを使用したスキーマの推論

change.stream.publish.full.document.onlyオプションをtrueに設定すると、Spark Connector はスキャンされたドキュメントのスキーマを使用してDataFrameのスキーマを推論します。

スキーマ推論はストリーミングの開始時に行われ、ストリーミング中に作成されたコレクションは考慮されません。

複数のコレクションからストリーミングしてスキーマを推論する場合、コネクタは各コレクションを順番にサンプリングします。 多数のコレクションからストリーミングすると、スキーマ推論のパフォーマンスが大幅に低下する可能性があります。 このパフォーマンスへの影響は、スキーマを推論している間のみ発生します。