定義
$https
ステージは、 接続レジストリにおいて HTTPS
リクエストを送信するための接続を指定します。前のステージがドキュメントを $https
に渡すたびに、そのステージは新しいリクエストを送信します。
構文
特定の接続に HTTPS
リクエストを送信するには、以下の手順に従います。
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer>, "parseJsonStrings": <boolean> } } }
$https
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 必須 | 接続レジストリ内で |
| string |式 | 任意 | 自身の たとえば、
呼び出す API エンドポイントは、冪等である必要があります。 |
| ドキュメント | 任意 | API エンドポイント呼び出しにパラメータとして渡すキーと値のペアを含むドキュメント。各キーは string である必要があり、各値は数値、string、またはブール値のいずれかに評価される必要があります。このフィールドは、値として式をサポートしています。 |
| string | 任意 | 接続のための HTTPS リクエストメソッド。次のいずれかの値でなければなりません。
デフォルトは |
| ドキュメント | 任意 | API エンドポイントにヘッダーとして渡すためのキーと値のペアを含むドキュメント。各キーは string である必要があり、各値は string として評価される必要があります。このフィールドは、値として式をサポートしています。 APIエンドポイントが認証を必要とする場合(例えば、APIキーやベアラーアクセストークン認証など)、接続を定義する際に認証情報をヘッダーとして追加し、これらがこの演算子の一部としてプレーンテキストで提供されないようにする必要があります。 無効なHTTPヘッダー名と値はAPIエンドポイントに送信されません。代わりに、これらは無視されます。 無効なHTTPヘッダーの詳細については、「 RFC9110 を参照してください。 値の式が失敗するか、string 以外の型に評価された場合、メッセージはデッドレターキュー (DLQ) に送られ、演算子はこのリクエストを API エンドポイントに送信しません。 |
| string | 必須 | REST API 応答のフィールド名。 エンドポイントが 0 バイトを返す場合、演算子は 演算子は、 APIエンドポイントが |
| string | 任意 |
演算子は、すべての
演算子は、他のすべての HTTP ステータス コードを
デフォルトは |
| 配列 | 任意 | API エンドポイントに送信するリクエストボディをカスタマイズできるカスタム内部パイプライン。
デフォルトでは、メッセージ全体が API エンドポイントに送信されます。Atlas Stream Processing は、緩和モードの JSON ペイロードを API エンドポイントに送信します。 無効な HTTP リクエストボディは API エンドポイントに送信されません。代わりに、これらはデッドレターキューに送られます。 無効な HTTP リクエストボディの詳細については、「RFC 9110」を参照してください。 |
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| integer | 任意 | 成功した デフォルトは |
| ブール値 | 任意 | Atlas がサーバー応答を再帰的に反復処理し、有効な JSON (エスケープ引用符を使用)を含む string 値を有効なBSONに直列化するかどうかを決定する設定で、下流のパイプラインステージで 結果を操作できるようにします。 デフォルトは |
| integer | 任意 | 接続できない場合に デフォルトは |
動作
$https
は $source
ステージの後、および $emit ステージまたは $merge
ステージの前に配置する必要があります。$https
を、$hoppingWindow
または $tumblingWindow
の内部パイプラインで使用できます。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
$source
ステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata
Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime
に設定されます。Apache Kafkaプロバイダーからの各レコードに対して、
$https
ステージはhttps_weather
接続で定義された HTTPSposition.coordinates
気象ソースにリクエストを送信します。リクエストでは、HTTPSリクエストのレコードの を使用して、そのロケーションの 7 日間の高温度予測値を摂氏単位で収集し、airTemperatureForecast
フィールドのパイプラインドキュメントに追加します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。