Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$source ステージ(Stream Processing)

$source

$sourceステージでは、データをストリーミングするための接続を接続レジストリで指定します。次の接続タイプがサポートされています。

  • Apache Kafkaエージェント

  • MongoDB コレクションの変更ストリーム

  • MongoDB database 変更ストリーム

  • MongoDBクラスターの変更ストリーム

  • AWS Kinesisデータストリーム

  • ドキュメント配列

Apache Kafkaプロバイダーからのストリーミングデータを使用する場合、$source ステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"schemaRegistry": {
"connectionName": "<schema-registry-name>",
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続レジストリ内の接続を識別するラベル。

topic

文字列または複数の文字列の配列

必須

メッセージをストリーミングする 1 つ以上の Apache Kafka トピックの名前。複数のトピックからのメッセージをストリーミングする場合は、配列で指定します。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージフィールドを引数として受け取る :式:$toDate

  • ソース メッセージフィールドを引数として受け取る :式:$dateFromString

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

partitionIdleTimeout

ドキュメント

任意

証明機関の計算で無視される前に、パーティションがアイドル状態になることを許可する時間を指定するドキュメント。

このフィールドはデフォルトで無効になっています。アイドル状態で進まないパーティションを処理するには、このフィールドに値を設定します。

partitionIdleTimeout.size

integer

任意

パーティションのアイドル タイムアウトの期間を指定する数値。

partitionIdleTimeout.unit

string

任意

パーティション アイドル タイムアウトの期間の単位。

unitの値は次のいずれかになります。

  • "ms" (ミリ秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

schemaRegistry

ドキュメント

任意

平均直列化されたソースからの読み取りをサポートするためにスキーマ レジストリの使用を有効にするドキュメント。

この機能を有効にするには、スキーマ Registry 接続を作成する必要があります。

schemaRegistry.connectionName

string

条件付き

Avro 非直列化に使用するスキーマ レジストリ接続の名前。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.auto_offset_reset

string

任意

Apache Kafkaソーストピック内のどのイベントで取り込みを開始するかを指定します。auto_offset_reset は次の値を取ります。

  • endlatest 、またはlargest : 集計が初期化されたときに、トピックの最新のイベントから取り込みを開始します。

  • earliestbeginning 、またはsmallest : トピック内の最も近いイベントから取り込みを開始します。

デフォルトは latest です。

config.group_id

string

任意

ストリーム プロセッサと関連付ける Kafka コンシューマー グループの ID。省略した場合、Atlas Stream Processing は、Stream Processing ワークスペースを次の形式の自動生成された ID に関連付けます。

asp-${streamProcessorId}-consumer

Atlas Stream Processing は、すべての永続ストリーム プロセッサに対してこのパラメーターの値を自動的に生成します。SP.process() で定義されたエフェメラル ストリーム プロセッサの場合、このパラメーターは手動で定義した場合にのみ設定されます。

config.enable_auto_commit

ブール値

条件付き

Kafkaプロバイダー パーティション オフセットのコミット ポリシーを決定するフラグ。Atlas Stream Processing は 2 つのコミット ポリシーをサポートしています。

  • このパラメータを true に設定すると、$source ステージが次の演算子にデータを渡すたびに、Atlas Stream Processing はオフセットをコミットします。

  • このパラメータを false に設定すると、Atlas Stream Processing がチェックポイントを取得するときに、ストリーム プロセッサはパーティション オフセットをコミットします。

このパラメータは、config.group_id が設定されている場合にのみ設定できます。

SP.process() で定義されたエフェメラル ストリーム プロセッサの場合、group_id を設定しない限り、このパラメータはデフォルトで false に設定されます。それ以外の場合、デフォルトは true になります。

Kafka を$source として使用する場合のオフセットの詳細については、 Kafkaソースとコンシューマー グループ オフセット を参照してください。

config.keyFormat

string

任意

Apache Kafkaキー データを逆直列化するために使用されるデータ型。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトは binData です。

config.keyFormatError

string

任意

Apache Kafkaキー データを逆直列化するときに発生したエラーの処理方法。次のいずれかの値である必要があります。

注意

Atlas Stream Processing では、ソース データ ストリーム内のドキュメントが有効なjsonまたはejsonである必要があります。 Atlas Stream Processing は、この要件を満たさないドキュメントをデッド レター キューに設定します(デッド レター キューを設定している場合)。

Atlas コレクションの変更ストリームを使用すると、アプリケーションは単一のコレクションにおけるリアルタイムデータの変更にアクセスできます。コレクションに対する変更ストリームを開く方法については、変更ストリームを参照してください。

変更ストリーム$sourceを使用する場合は、ソースクラスターで少なくとも24時間のoplog windowを構成してください。

変更ストリームを読み取るために、Atlas Stream Processing は oplog コレクションをスキャンします。その結果、ログに COLLSCAN 警告が表示されることがあります。これらの警告は通常の動作を示しており、エラーを示すものではありません。

config.fullDocumentまたはconfig.fullDocumentBeforeChangerequiredに設定する場合は、キャプチャー対象の書き込み操作を実行する前に、各コレクションでchangeStreamPreAndPostImagesを有効にしてください。書き込み発生時に機能が有効化されていなかった場合、またはpost-imageの有効期限が切れていた場合に、イベントでpost-imageを利用できないと、ストリームプロセッサーは失敗します。変更前ドキュメントおよび変更後ドキュメントを有効化する方法については、「ドキュメントのPre-ImageおよびPost-Imageを使用したChange Streams」を参照してください。

Atlas コレクションの変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

db

string

必須

connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。

coll

文字列または複数の文字列の配列

必須

connectionName によって指定された Atlasインスタンスでホストされている 1 つ以上のMongoDBコレクションの名前。 これらのコレクションの変更ストリームは、ストリーミングデータソースとして機能します。 このフィールドを省略すると、ストリーム プロセッサはMongoDB Database Change Stream を使用します。

initialSync

ドキュメント

任意

initialSync 機能に関連するフィールドを含むドキュメント。

Atlas Stream Processing initialSync を使用すると、 changeEvent ドキュメントを挿入するのと同様に、Atlasコレクションに既存のドキュメントを取り込むことができます。initialSync を有効にしている場合、ストリーム プロセッサを起動すると、まずコレクション内のすべての既存のドキュメントを取り込んで処理してから、新しい受信 changeEvent ドキュメントの取り込みと処理に進みます。initialSync が完了すると、繰り返されません。

initialSync を有効にすると、パイプラインで $hoppingWindow$sessionWindow、または $tumblingWindow ステージを使用できなくなります。

重要: initialSync は、受信ドキュメントの _id 値がデフォルトで生成された ObjectId 値または順序付けられた intもしくはlong 値であるコレクションでのみ使用できます。すべての _id 値は同じタイプである必要があります。

initialSync.enable

ブール値

条件付き

initialSync を有効にするかどうかを決定します。initialSyncフィールドを宣言する場合は、このフィールドを に設定する必要があります。

initialSync.parallelism

integer

任意

initialSync操作を処理する並列処理のレベルを決定します。値を指定しない場合、デフォルトは 1 になります。

各ストリーム プロセッサには、その階層によって決定される最大累積並列処理値があります。ストリーム プロセッサの累積並列処理は、次のように計算されます。

parallelism total - parallelized stages

parallelism total$source$lookup ステージ、$merge ステージにおいて 1 を超えるすべての parallelism 値の合計であり、parallelized stagesparallelism の値が 1 より大きいこれらのステージの数です。

例、$source ステージが 4parallelism 値を設定し、$lookup ステージでは parallelism 値が設定されていない(デフォルトは 1)、かつ $merge ステージが parallelism に設定されている場合: 2 の } 値がある場合は 2 つの parallelized stages があり、ストリーム プロセッサの累積並列処理は (4 + 2) - 2 として計算されます。

ストリーム プロセッサがその階層の最大累積並列処理を超える場合、Atlas Stream Processing はエラーをスローし、目的のレベルの並列処理に必要な最小プロセッサ階層について提案します。エラーを解決するには、プロセッサをより高い階層に増やすアップするか、ステージの並列処理値を低くする必要があります。詳しくは、Stream Processingをご覧ください。

readPreference

ドキュメント

任意

読み込み設定 (read preference)変更ストリームとinitialSync 操作の 。

デフォルトは primary です。

readPreferenceTags

ドキュメント

任意

読み込み設定 (read preference) タグ変更ストリームとinitialSync 操作の。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ | date

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

MongoDB 拡張 JSON$date または $timestamp の値を受け付けます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • default : update 操作の場合、完全なドキュメントを返しません。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

集計パイプラインを指定し、変更ストリーム出力がパスされ、さらなるプロセシングが実行される前にフィルタリングします。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。

重要: 各変更イベントには wallTime フィールドと clusterTime フィールドが含まれます。$source 以降の Atlas Stream Processing ステージでは、プロセッサがこれらのフィールドを取り込んだため、受信することが想定されています。Change Stream データの適切なプロセシングを確保するために、$source.config.pipeline ではこれらのフィールドを変更しないでください。

config.maxAwaitTimeMS

integer

任意

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。

デフォルトは 1000 です。

Atlas データベースの変更ストリームを使用すると、アプリケーションは単一のデータベースでリアルタイムデータの変更にアクセスできます。データベースに対して変更ストリームを開く方法については、変更ストリームを参照してください。

変更ストリーム$sourceを使用する場合は、ソースクラスターで少なくとも24時間のoplog windowを構成してください。

変更ストリームを読み取るために、Atlas Stream Processing は oplog コレクションをスキャンします。その結果、ログに COLLSCAN 警告が表示されることがあります。これらの警告は通常の動作を示しており、エラーを示すものではありません。

config.fullDocumentまたはconfig.fullDocumentBeforeChangerequiredに設定する場合は、キャプチャー対象の書き込み操作を実行する前に、各コレクションでchangeStreamPreAndPostImagesを有効にしてください。書き込み発生時に機能が有効化されていなかった場合、またはpost-imageの有効期限が切れていた場合に、イベントでpost-imageを利用できないと、ストリームプロセッサーは失敗します。変更前ドキュメントおよび変更後ドキュメントを有効化する方法については、「ドキュメントのPre-ImageおよびPost-Imageを使用したChange Streams」を参照してください。

Atlas データベース変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

db

string

必須

connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。

readPreference

ドキュメント

任意

readPreferenceTags

ドキュメント

任意

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

MongoDB 拡張 JSON$date または $timestamp の値を受け付けます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

変更ストリーム出力を発生元でフィルタリングするための集計パイプラインを指定します。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。

重要: 各変更イベントには wallTime フィールドと clusterTime フィールドが含まれます。$source 以降の Atlas Stream Processing ステージでは、プロセッサがこれらのフィールドを取り込んだため、受信することが想定されています。Change Stream データの適切なプロセシングを確保するために、$source.config.pipeline ではこれらのフィールドを変更しないでください。

config.maxAwaitTimeMS

integer

任意

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。

デフォルトは 1000 です。

変更ストリーム$sourceを使用する場合は、ソースクラスターで少なくとも24時間のoplog windowを構成してください。

変更ストリームを読み取るために、Atlas Stream Processing は oplog コレクションをスキャンします。その結果、ログに COLLSCAN 警告が表示されることがあります。これらの警告は通常の動作を示しており、エラーを示すものではありません。

config.fullDocumentまたはconfig.fullDocumentBeforeChangerequiredに設定する場合は、キャプチャー対象の書き込み操作を実行する前に、各コレクションでchangeStreamPreAndPostImagesを有効にしてください。書き込み発生時に機能が有効化されていなかった場合、またはpost-imageの有効期限が切れていた場合に、イベントでpost-imageを利用できないと、ストリームプロセッサーは失敗します。変更前ドキュメントおよび変更後ドキュメントを有効化する方法については、「ドキュメントのPre-ImageおよびPost-Imageを使用したChange Streams」を参照してください。

Atlas クラスター変更ストリーム全体からのストリーミングデータを操作するには、$source ステージのプロトタイプ形式は次のようになります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

readPreference

ドキュメント

任意

readPreferenceTags

ドキュメント

任意

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

日付 | タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

MongoDB 拡張 JSON$date または $timestamp の値を受け付けます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

変更ストリーム出力を発生元でフィルタリングするための集計パイプラインを指定します。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。

Atlas Stream Processing は、取り込まれた各変更イベントから wallTime フィールドと clusterTime フィールドを受信することが予想されていることに注意してください。Change Stream データの適切な処理を確保するために、$source.config.pipeline ではこれらのフィールドを変更しないでください。

config.maxAwaitTimeMS

integer

任意

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。

デフォルトは 1000 です。

Atlas Stream Processing は、 AWS Kinesisストリームへの Private Link 接続 の作成をサポートしています。詳細については、Kinesis Private Link 接続の追加 を参照してください。

AWS Kinesisデータストリームのデータを操作する場合、$source ステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<field-name>",
"shardIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"consumerARN": "<aws-arn>",
"initialPosition": <initial-position>,
reshardDetectionIntervalSecs: <interval>
}
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続レジストリ内の接続を識別するラベル。

config.consumerARN

string

必須

stream

string

必須

メッセージをストリーミングするAWS Kinesisデータストリーム。

region

string

条件付き

指定されたストリームが存在するAWSリージョン。Kinesis は、異なるリージョンで同じ名前の複数のデータ ストリームをサポートしています。同じ接続内の 2 つ以上のリージョンのデータストリームに同じ名前を使用する場合は、使用する名前とリージョンの組み合わせを指定するために、このフィールドを使用する必要があります。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

shardIdleTimeout

ドキュメント

任意

埋め込みの計算で無視される前に、シャードがアイドル状態になる時間を指定するドキュメント。

このフィールドは、デフォルトで無効になっています。アイドル状態であるため、前に移動しないシャードを取り扱うには、このフィールドに値を設定します。

shardIdleTimeout.size

ドキュメント

任意

シャード アイドル タイムアウトの期間を指定する数値。

shardIdleTimeout.unit

ドキュメント

任意

シャード アイドル タイムアウトの期間の単位。

unitの値は次のいずれかになります。

  • "ms" (ミリ秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.initialPosition

string

任意

メッセージの取り込みを開始するKinesisデータストリームの履歴内の位置。次のいずれかである必要があります。

  • "TRIM_HORIZON": シャード内の最も古いメッセージから取り込みを開始します。

  • "LATEST": シャード内の最新のメッセージから取り込みを開始します。

デフォルトは "LATEST" です。

reshardDetectionIntervalSecs

integer

任意

ドキュメントの配列を操作するために、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

documents

配列

条件付き

ストリーミング データソースとして使用するドキュメントの配列。 このフィールドの値は、オブジェクトの配列、またはオブジェクトの配列として評価される 式 のいずれかになります。 connectionNameフィールドを使用する場合は、このフィールドを使用しないでください。

$sourceは、それが表示されるすべてのパイプラインの最初のステージである必要があります。パイプラインごとに使用できる$source ステージは 1 つだけです。

Kafka $source ステージでは、Atlas Stream Processing はソーストピック内の複数のパーティションから並列に読み取りを行います。パーティションの制限は、プロセッサ階層によって決まります。詳しくは、Stream Processing 請求参照を参照してください。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. $sourceステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime に設定されます。

  2. $matchステージでは、dewPoint.value 5.0が 以下のドキュメントを除外し、 がdewPoint.value 5.0より大きいドキュメントを次のステージに渡します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

[{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata"
}
},
{
"$match": { "dewPoint.value": { "$gt": 5 } }
},
{
"$merge": {
"into": {
"connectionName": "weatherStreamOutput",
"db": "sample_weatherstream",
"coll": "stream"
}
}
}]

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: {
code: 'N',
period: 99.9,
quantity: '9',
value: -30.4
},
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: {
condition: '9',
depth: 160,
period: 24,
quality: '2'
},
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 4
},
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

次の集計は、cluster0-collection ソースからデータを取り込み、サンプルデータセットがロードされた Atlas クラスターに接続します。Stream Processing ワークスペースを作成し、Atlas クラスターへの接続を接続レジストリに追加する方法については、Atlas Stream Processing の始め方 をご覧ください。この集計は 2 つのステージを実行して、sample_weatherdata データベースの data コレクションに対する変更ストリームを開き、変更を記録します。

  1. $sourceステージはcluster0-collection ソースに接続し、data sample_weatherdataデータベース内の コレクションに対して変更ストリームを開きます。

  2. $merge ステージは、フィルタリングされた変更ストリームドキュメントを、sample_weatherdata データベース内の data_changes という名前の Atlas コレクションに書き込みます。そのようなコレクションが存在しない場合、Atlas が作成します。

[{
"$source": {
"connectionName": "cluster0-connection",
"db": "sample_weatherdata",
"coll": "data"
}
},
{
"$merge": {
"into": {
"connectionName": "cluster0-connection",
"db": "sample_weatherdata",
"coll": "data_changes"
}
}
}]

次の mongosh コマンドは data ドキュメントを削除します。

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

data ドキュメントが削除された後、ストリームプロセッサは変更ストリームイベントドキュメントを sample_weatherdata.data_changes コレクションに書き込みます。結果の sample_weatherdata.data_changes コレクション内のドキュメントを表示するには、mongosh を使用して Atlas クラスターに接続し、次のコマンドを実行してください。

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

次の集計では、インラインドキュメント配列 をストリーミングデータソースとして使用し、3 つのロケーションの気象測定が含まれます。配列は、気象用サンプル データセットと同じスキーマを使用します。この集計は3 つのステージを実行します。

  1. $sourcedocumentsステージでは、気象測定のインライン 配列をストリーミングデータソースとして定義し、timeField timestampを使用して各ドキュメントの フィールドを認証タイムスタンプとして指定します。

  2. $match ステージでは、dewPoint.value5.0 を超えるドキュメントのみが次のステージに渡されます。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

[{
"$source": {
"documents": [
{
"location": "New York",
"timestamp": ISODate('2024-01-15T08:00:00Z'),
"temp": 23.5,
"dewPoint": { "value": 6.2 }
},
{
"location": "Los Angeles",
"timestamp": ISODate('2024-01-15T08:05:00Z'),
"temp": 18.2,
"dewPoint": { "value": 4.8 }
},
{
"location": "Chicago",
"timestamp": ISODate('2024-01-15T08:10:00Z'),
"temp": 26.8,
"dewPoint": { "value": 7.5 }
}
]
}
},
{
"$match": { "dewPoint.value": { "$gt": 5.0 } }
},
{
"$merge": {
"into": {
"connectionName": "weatherStreamOutput",
"db": "sample_weatherstream",
"coll": "stream"
}
}
}]

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
[
{
_id: ObjectId('67a6c3df14fcac13b1a21a01'),
dewPoint: { value: 6.2 },
location: 'New York',
temp: 23.5,
timestamp: ISODate('2024-01-15T08:00:00.000Z')
},
{
_id: ObjectId('67a6c3df14fcac13b1a21a03'),
dewPoint: { value: 7.5 },
location: 'Chicago',
temp: 26.8,
timestamp: ISODate('2024-01-15T08:10:00.000Z')
}
]