定義
$source
ステージでは、データをストリーミングするための接続レジストリで接続を指定します。 次の接続タイプがサポートされています。
Apache Kafka エージェント
MongoDB コレクションの変更ストリーム
MongoDB database 変更ストリーム
ドキュメント配列
注意
Atlas サーバーレスインスタンスを$source
として使用することはできません。
構文
Apache Kafka ブロック
Apache Kafkaプロバイダーからのストリーミングデータを使用する場合、$source
ステージには次のプロトタイプ形式があります。
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 | |
---|---|---|---|---|
| string | 必須 | データを取り込む接続レジストリ内の接続を識別するラベル。 | |
| 文字列または複数の文字列の配列 | 必須 | メッセージをストリーミング配信する 1 つ以上の Apache Kafka トピックの名前。複数のトピックからのメッセージをストリーミングする場合は、配列で指定します。 | |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
| |
| ドキュメント | 任意 | 証明機関の計算で無視される前に、パーティションがアイドル状態になることを許可する時間を指定するドキュメント。 このフィールドはデフォルトで無効になっています。アイドル状態で進まないパーティションを処理するには、このフィールドに値を設定します。 | |
| integer | 任意 | パーティションのアイドル タイムアウトの期間を指定する数値。 | |
| string | 任意 | パーティション アイドル タイムアウトの期間の単位。
| |
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 | |
| string | 任意 | Apache Kafkaソーストピック内のどのイベントで取り込みを開始するかを指定します。
デフォルトは | |
| string | 任意 | ストリーム プロセッサに関連付ける Kafka コンシューマー グループの ID。 省略した場合、Atlas Stream Processing は、ストリーム プロセシング インスタンスを次の形式の自動生成された ID に関連付けます。
Atlas Stream Processing は、チェックポイントがコミットされたKafkaに、指定されたグループ IDします。オフセットを超えるメッセージがチェックポイントに永続的に記録されると、オフセットをコミットします。これにより、ストリーム プロセッサのオフセット ラグと進行状況をKafkaプロバイダー コンシューマー グループメタデータから直接追跡できます。 | |
| string | 任意 | Apache Kafkaキー データを逆直列化するために使用されるデータ型。次のいずれかの値である必要があります。
デフォルトは | |
| string | 任意 | Apache Kafkaキー データを逆直列化するときに発生したエラーの処理方法。次のいずれかの値である必要があります。
|
注意
Atlas Stream Processing では、ソース データ ストリーム内のドキュメントが有効なjson
またはejson
である必要があります。 Atlas Stream Processing は、この要件を満たさないドキュメントをデッド レター キューに設定します(デッド レター キューを設定している場合)。
MongoDB コレクションの変更ストリーム
Atlas コレクションの変更ストリームを使用すると、アプリケーションは単一のコレクションにおけるリアルタイムデータの変更にアクセスできます。コレクションに対する変更ストリームを開く方法については、変更ストリームを参照してください。
Atlas コレクションの変更ストリームからのストリーミング データを操作する場合、 $source
ステージには次のプロトタイプ形式があります。
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "coll" : ["<source-coll>",...], "initialSync": { "enable": <boolean>, "parallelism": <integer> }, "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }], "maxAwaitTimeMS": <time-ms>, "readPreference": "<read-preference>", "readPreferenceTags": [ {"<key>": "<value>"}, . . . ] } } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 条件付き | データを取り込む接続レジストリ内の接続を識別するラベル。 |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| string | 必須 |
|
| 文字列または複数の文字列の配列 | 必須 |
|
| ドキュメント | 任意 |
Atlas Stream Processing
重要
|
| ブール値 | 条件付き |
|
| integer | 任意 |
|
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| token | 条件付き | ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。
|
| タイムスタンプ | 条件付き | ソースがレポートを開始するoptime 。
MongoDB 拡張 JSON の |
| string | 条件付き | 変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。
fullDocument の値を指定しない場合、デフォルトは コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ブール値 | 条件付き | すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| string | 任意 | 変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。
コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ドキュメント | 任意 | 集計パイプラインを指定し、変更ストリーム出力がパスされ、さらなるプロセシングが実行される前にフィルタリングします。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。 重要各変更イベントには |
| ドキュメント | 任意 | 操作の読み込み設定(read デフォルトは |
| ドキュメント | 任意 | 操作の 読み込み設定(read preference)タグ。 |
| integer | 任意 | 空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。 デフォルトは |
MongoDB Database Change Stream
Atlas データベースの変更ストリームを使用すると、アプリケーションは単一のデータベースでリアルタイムデータの変更にアクセスできます。データベースに対して変更ストリームを開く方法については、変更ストリームを参照してください。
Atlas データベース変更ストリームからのストリーミング データを操作する場合、 $source
ステージには次のプロトタイプ形式があります。
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 条件付き | データを取り込む接続レジストリ内の接続を識別するラベル。 |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| string | 必須 |
|
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| token | 条件付き | ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。
|
| タイムスタンプ | 条件付き | ソースがレポートを開始するoptime 。
MongoDB 拡張 JSON の |
| string | 条件付き | 変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。
fullDocument の値を指定しない場合、デフォルトは データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ブール値 | 条件付き | すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| string | 任意 | 変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。
データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ドキュメント | 任意 | 変更ストリーム出力を発生元でフィルタリングするための集計パイプラインを指定します。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。 重要各変更イベントには |
| integer | 任意 | 空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。 デフォルトは |
MongoDB クラスター全体の変更ストリームソース
Atlas クラスター変更ストリーム全体からのストリーミングデータを操作するには、$source
ステージのプロトタイプ形式は次のようになります。
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 条件付き | データを取り込む接続レジストリ内の接続を識別するラベル。 |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| token | 条件付き | ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。
|
| 日付 | タイムスタンプ | 条件付き | ソースがレポートを開始するoptime 。
MongoDB 拡張 JSON の |
| string | 条件付き | 変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。
fullDocument の値を指定しない場合、デフォルトは データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ブール値 | 条件付き | すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| string | 任意 | 変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。
データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ドキュメント | 任意 | 変更ストリーム出力を発生元でフィルタリングするための集計パイプラインを指定します。このパイプラインは「変更ストリーム出力の修正」で説明されているパラメータに準拠する必要があります。 Atlas Stream Processing は、取り込まれた各変更イベントから |
| integer | 任意 | 空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまで待機する最大時間(ミリ秒)。 デフォルトは |
ドキュメント配列
ドキュメントの配列を操作するために、 $source
ステージには次のプロトタイプ形式があります。
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "documents" : [{source-doc},...] | <expression> } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| 配列 | 条件付き | ストリーミング データソースとして使用するドキュメントの配列。 このフィールドの値は、オブジェクトの配列、またはオブジェクトの配列として評価される 式 のいずれかになります。 |
動作
$source
は、表示されるすべてのパイプラインの最初のステージである必要があります。 パイプラインごとに使用できる$source
ステージは 1 つだけです。
例
Kafka の例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
$source
ステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata
Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime
に設定されます。$match
ステージでは、dewPoint.value
5.0
が 以下のドキュメントを除外し、 がdewPoint.value
5.0
より大きいドキュメントを次のステージに渡します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。
変更ストリームの例
次の集計は、cluster0-collection
ソースからデータを取り込み、サンプルデータセットがロードされた Atlas クラスターに接続します。ストリーム処理インスタンスを作成し、Atlas クラスターへの接続を接続レジストリに追加する方法については、Atlas Stream Processing の始め方 をご覧ください。この集計は 2 つのステージを実行して、sample_weatherdata
データベースの data
コレクションに対する変更ストリームを開き、変更を記録します。
$source
ステージはcluster0-collection
ソースに接続し、sample_weatherdata
データベース内のdata
コレクションに対して変更ストリームを開きます。$merge
ステージは、フィルタリングされた変更ストリームドキュメントを、sample_weatherdata
データベース内のdata_changes
という名前の Atlas コレクションに書き込みます。そのようなコレクションが存在しない場合、Atlas が作成します。
{ $source: { connectionName: "cluster0-connection", db : "sample_weatherdata", coll : "data" }, $merge: { into: { connectionName: "cluster0-connection", db: "sample_weatherdata", coll: "data_changes" } } }
次の mongosh
コマンドは data
ドキュメントを削除します。
db.getSiblingDB("sample_weatherdata").data.deleteOne( { _id: ObjectId("5553a99ae4b02cf715120e4b") } )
data
ドキュメントが削除された後、ストリームプロセッサは変更ストリームイベントドキュメントを sample_weatherdata.data_changes
コレクションに書き込みます。結果の sample_weatherdata.data_changes
コレクション内のドキュメントを表示するには、mongosh
を使用して Atlas クラスターに接続し、次のコマンドを実行してください。
db.getSiblingDB("sample_weatherdata").data_changes.find()
[ { _id: { _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004' }, clusterTime: Timestamp({ t: 1738790819, i: 1 }), documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') }, ns: { db: 'sample_weatherdata', coll: 'data' }, operationType: 'delete', wallTime: ISODate('2025-02-05T21:26:59.313Z') } ]