Overview
ストリーミング モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。
注意
SparkConfを使用してコネクターの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.を付けます。
プロパティ名 | 説明 | ||
|---|---|---|---|
| 必須。 | ||
| 必須。 | ||
| 必須。 | ||
| 読み取り操作に追加するコメント。コメントは データベースプロファイラー | ||
| 期待されるスキーマに一致しないドキュメントを処理する場合に使用する解析戦略。 このオプションは、次の値を受け入れます。
| ||
|
| ||
| MongoClientFactory 構成キー。カスタム実装を指定できます。これは | ||
| Sparkにデータを送信する前にコレクションに適用するカスタム集計パイプラインを指定します。値は、拡張JSON単一ドキュメントまたはドキュメントのリストである必要があります。単一のドキュメントは次のようになります。 ドキュメントのリストは次のようになります。 カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 | ||
| 集計を実行中ときにディスクへのストレージを許可するかどうかを指定します。デフォルト: | ||
| ストリーム構成のプレフィックスを変更します。変更ストリームの詳細については、「 | ||
|
| ||
| コレクションのスキーマを推論するときに使用する既知のフィールドタイプの部分的なスキーマを指定します。 |
ストリーム構成の変更
MongoDB から変更ストリームを読み取るときに、次のプロパティを構成できます。
プロパティ名 | 説明 |
|---|---|
| アップデート操作時に変更ストリームが返す値を決定します。 デフォルト設定では、元のドキュメントと更新されたドキュメントの差が返されます。
この変更ストリーム オプションの機能の詳細については、MongoDB サーバー マニュアル ガイド「 更新操作のための完全なドキュメントの検索 」を参照してください。 デフォルト: "default" |
| Spark コネクタが各マイクロ バッチを に分割するパーティションの最大数。 Sparkワーカーはこれらのパーティションを並列に処理できます。この設定は、マイクロバッチ ストリームを使用する場合にのみ適用されます。デフォルト: 警告: |
| 変更されたドキュメントを公開するか、完全な変更ストリームドキュメントを公開するかを指定します。この設定が
この設定は デフォルト: |
| オフセットが使用できない場合にコネクタが起動する方法を指定します。 この設定では、次の値を受け入れます。
|
| 変更されたドキュメントの変更前のイメージを変更ストリーム出力
デフォルト: |
でのプロパティの指定 connection.uri
SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri設定に含めるか、個別に一覧表示できます。
次のコード例は、 connection.uri設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
connection.uriを短くして設定を読みやすくするには、代わりにこれらを個別に指定します。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection
重要
connection.uriとその行の両方に 設定を指定すると、 connection.uriの設定が優先されます。 たとえば、次の構成では、接続データベースはfoobarです。これはconnection.uri設定の 値であるためです。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
プロパティで複数のコレクションを指定collection
コレクション名をカンマで区切って、 collection変更ストリーム構成プロパティで複数のコレクションを指定できます。 スペースがコレクション名の一部でない限り、コレクション間にスペースを入れないでください。
次の例に示すように、複数のコレクションを指定します。
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
コレクション名が「*」の場合、または名前にコンマまたはバックスラッシュ(\)が含まれている場合は、次のように文字をエスケープする必要があります。
collection構成オプションで使用されるコレクションの名前にカンマが含まれている場合、 Spark Connectorはそれを 2 つの異なるコレクションとして扱います。 これを回避するには、カンマの前にバックスラッシュ(\)を付けてコンマをエスケープする必要があります。 「my,collection」という名前のコレクションを次のようにエスケープします。"my\,collection" collection構成オプションで使用されるコレクションの名前が「*」の場合、 Spark Connectorはそれをすべてのコレクションをスキャンするための仕様と解釈します。 これを回避するには、アスタリスクの前にバックスラッシュ(\)を付けてアスタリスクをエスケープする必要があります。 「*」という名前のコレクションを次のようにエスケープします。"\*" collection構成オプションで使用されるコレクションの名前にバックスラッシュ(\)が含まれている場合、 Spark Connectorはバックスラッシュをエスケープ文字として扱い、 値の解釈方法が変更される可能性があります。 これを回避するには、バックスラッシュの前に別のバックスラッシュを付けて、バックスラッシュをエスケープする必要があります。 "\collection" という名前のコレクションを次のようにエスケープします。"\\collection" 注意
Java でコレクション名を string リテラルとして指定する場合は、それぞれのバックスラッシュを別のバックスラッシュでさらにエスケープする必要があります。 たとえば、"\collection" という名前のコレクションは、次のようにエスケープします。
"\\\\collection"
コレクション名のstringとしてアスタリスク(*)を渡すことで、 データベース内のすべてのコレクションからストリーミングできます。
次の例に示すように、すべてのコレクションを指定します。
... .option("spark.mongodb.collection", "*")
すべてのコレクションからストリーミング中にコレクションを作成すると、新しいコレクションは自動的にストリームに含まれます。
複数のコレクションからストリーミングしているときに、いつでもコレクションを削除できます。
重要
複数のコレクションを使用したスキーマの推論
change.stream.publish.full.document.onlyオプションをtrueに設定すると、Spark Connector はスキャンされたドキュメントのスキーマを使用してDataFrameのスキーマを推論します。
スキーマ推論はストリーミングの開始時に行われ、ストリーミング中に作成されたコレクションは考慮されません。
複数のコレクションからストリーミングしてスキーマを推論する場合、コネクタは各コレクションを順番にサンプリングします。 多数のコレクションからストリーミングすると、スキーマ推論のパフォーマンスが大幅に低下する可能性があります。 このパフォーマンスへの影響は、スキーマを推論している間のみ発生します。