Docs Menu
Docs Home
/
Atlas
/ /

$source ステージ(Stream Processing)

$source

$sourceステージでは、データをストリーミングするための接続レジストリで接続を指定します。 次の接続タイプがサポートされています。

注意

Atlas サーバーレスインスタンスを$sourceとして使用することはできません。

Apache Kafkaプロバイダーからのストリーミングデータを使用する場合、$source ステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続レジストリ内の接続を識別するラベル。

topic

文字列または複数の文字列の配列

必須

メッセージをストリーミング配信する 1 つ以上の Apache Kafka トピックの名前。複数のトピックからのメッセージをストリーミングする場合は、配列で指定します。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

partitionIdleTimeout

ドキュメント

任意

証明機関の計算で無視される前に、パーティションがアイドル状態になることを許可する時間を指定するドキュメント。

このフィールドはデフォルトで無効になっています。アイドル状態で進まないパーティションを処理するには、このフィールドに値を設定します。

partitionIdleTimeout.size

integer

任意

パーティションのアイドル タイムアウトの期間を指定する数値。

partitionIdleTimeout.unit

string

任意

パーティション アイドル タイムアウトの期間の単位。

unitの値は次のいずれかになります。

  • "ms" (ミリ秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.auto_offset_reset

string

任意

Apache Kafkaソーストピック内のどのイベントで取り込みを開始するかを指定します。auto_offset_reset は次の値を取ります。

  • endlatest 、またはlargest : 集計が初期化されたときに、トピックの最新のイベントから取り込みを開始します。

  • earliestbeginning 、またはsmallest : トピック内の最も近いイベントから取り込みを開始します。

デフォルトは latest です。

config.group_id

string

任意

ストリーム プロセッサに関連付ける Kafka コンシューマー グループの ID。 省略した場合、Atlas Stream Processing は、ストリーム プロセシング インスタンスを次の形式の自動生成された ID に関連付けます。

asp-${streamProcessorId}-consumer

Atlas Stream Processing は、チェックポイントがコミットされたKafkaに、指定されたグループ IDします。オフセットを超えるメッセージがチェックポイントに永続的に記録されると、オフセットをコミットします。これにより、ストリーム プロセッサのオフセット ラグと進行状況をKafkaプロバイダー コンシューマー グループメタデータから直接追跡できます。

config.keyFormat

string

任意

Apache Kafkaキー データを逆直列化するために使用されるデータ型。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトは binData です。

config.keyFormatError

string

任意

Apache Kafkaキー データを逆直列化するときに発生したエラーの処理方法。次のいずれかの値である必要があります。

注意

Atlas Stream Processing では、ソース データ ストリーム内のドキュメントが有効なjsonまたはejsonである必要があります。 Atlas Stream Processing は、この要件を満たさないドキュメントをデッド レター キューに設定します(デッド レター キューを設定している場合)。

Atlas コレクションの変更ストリームを使用すると、アプリケーションは単一のコレクションにおけるリアルタイムデータの変更にアクセスできます。コレクションに対する変更ストリームを開く方法については、変更ストリームを参照してください。

Atlas コレクションの変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
}
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

db

string

必須

connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。

coll

文字列または複数の文字列の配列

必須

connectionName によって指定された Atlasインスタンスでホストされている 1 つ以上のMongoDBコレクションの名前。 これらのコレクションの変更ストリームは、ストリーミングデータソースとして機能します。 このフィールドを省略すると、ストリーム プロセッサはMongoDB Database Change Stream を使用します。

initialSync

ドキュメント

任意

initialSync 機能に関連するフィールドを含むドキュメント。

Atlas Stream Processing initialSync を使用すると、 changeEvent ドキュメントを挿入するのと同様に、Atlasコレクションに既存のドキュメントを取り込むことができます。initialSync を有効にしている場合、ストリーム プロセッサを起動すると、まずコレクション内のすべての既存のドキュメントを取り込んで処理してから、新しい受信 changeEvent ドキュメントの取り込みと処理に進みます。initialSync が完了すると、繰り返されません。

initialSync を有効にすると、パイプラインで $hoppingWindow$sessionWindow、または $tumblingWindow ステージを使用できなくなります。

重要

initialSync は、受信ドキュメントの _id 値がデフォルトで生成された ObjectId 値または順序付けられた intもしくはlong 値であるコレクションでのみ使用できます。すべての _id 値は同じタイプである必要があります。

initialSync.enable

ブール値

条件付き

initialSync を有効にするかどうかを決定します。initialSyncフィールドを宣言する場合は、このフィールドを に設定する必要があります。

initialSync.parallelism

integer

任意

initialSync操作を処理する並列処理のレベルを決定します。値を指定しない場合、デフォルトは 1 になります。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

MongoDB 拡張 JSON$date または $timestamp の値を受け付けます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

集計パイプラインを指定し、変更ストリーム出力がパスされ、さらなるプロセシングが実行される前にフィルタリングします。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。

重要

各変更イベントには wallTime フィールドと clusterTime フィールドが含まれます。$source 以降の Atlas Stream Processing ステージでは、プロセッサがこれらのフィールドを取り込んだため、受信することが想定されています。Change Stream データの適切な処理を確保するために、$source.config.pipeline ではこれらのフィールドを変更しないでください。

config.readPreference

ドキュメント

任意

操作の読み込み設定(read initialSyncpreference)。

デフォルトは primary です。

config.readPreferenceTags

ドキュメント

任意

config.maxAwaitTimeMS

integer

任意

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。

デフォルトは 1000 です。

Atlas データベースの変更ストリームを使用すると、アプリケーションは単一のデータベースでリアルタイムデータの変更にアクセスできます。データベースに対して変更ストリームを開く方法については、変更ストリームを参照してください。

Atlas データベース変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

db

string

必須

connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

MongoDB 拡張 JSON$date または $timestamp の値を受け付けます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

変更ストリーム出力を発生元でフィルタリングするための集計パイプラインを指定します。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。

重要

各変更イベントには wallTime フィールドと clusterTime フィールドが含まれます。$source 以降の Atlas Stream Processing ステージでは、プロセッサがこれらのフィールドを取り込んだため、受信することが想定されています。Change Stream データの適切な処理を確保するために、$source.config.pipeline ではこれらのフィールドを変更しないでください。

config.maxAwaitTimeMS

integer

任意

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。

デフォルトは 1000 です。

Atlas クラスター変更ストリーム全体からのストリーミングデータを操作するには、$source ステージのプロトタイプ形式は次のようになります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

日付 | タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

MongoDB 拡張 JSON$date または $timestamp の値を受け付けます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

変更ストリーム出力を発生元でフィルタリングするための集計パイプラインを指定します。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。

Atlas Stream Processing は、取り込まれた各変更イベントから wallTime フィールドと clusterTime フィールドを受信することが予想されていることに注意してください。Change Stream データの適切な処理を確保するために、$source.config.pipeline ではこれらのフィールドを変更しないでください。

config.maxAwaitTimeMS

integer

任意

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。

デフォルトは 1000 です。

ドキュメントの配列を操作するために、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

documents

配列

条件付き

ストリーミング データソースとして使用するドキュメントの配列。 このフィールドの値は、オブジェクトの配列、またはオブジェクトの配列として評価される 式 のいずれかになります。 connectionNameフィールドを使用する場合は、このフィールドを使用しないでください。

$sourceは、表示されるすべてのパイプラインの最初のステージである必要があります。 パイプラインごとに使用できる$sourceステージは 1 つだけです。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. $sourceステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime に設定されます。

  2. $matchステージでは、dewPoint.value 5.0が 以下のドキュメントを除外し、 がdewPoint.value 5.0より大きいドキュメントを次のステージに渡します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

次の集計は、cluster0-collection ソースからデータを取り込み、サンプルデータセットがロードされた Atlas クラスターに接続します。ストリーム処理インスタンスを作成し、Atlas クラスターへの接続を接続レジストリに追加する方法については、Atlas Stream Processing の始め方 をご覧ください。この集計は 2 つのステージを実行して、sample_weatherdata データベースの data コレクションに対する変更ストリームを開き、変更を記録します。

  1. $source ステージは cluster0-collection ソースに接続し、sample_weatherdata データベース内の data コレクションに対して変更ストリームを開きます。

  2. $merge ステージは、フィルタリングされた変更ストリームドキュメントを、sample_weatherdata データベース内の data_changes という名前の Atlas コレクションに書き込みます。そのようなコレクションが存在しない場合、Atlas が作成します。

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

次の mongosh コマンドは data ドキュメントを削除します。

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

data ドキュメントが削除された後、ストリームプロセッサは変更ストリームイベントドキュメントを sample_weatherdata.data_changes コレクションに書き込みます。結果の sample_weatherdata.data_changes コレクション内のドキュメントを表示するには、mongosh を使用して Atlas クラスターに接続し、次のコマンドを実行してください。

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

戻る

集計ステージ

項目一覧