Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/ /

$https 集計ステージ(ストリーム プロセシング)

$https

$https ステージは、 接続レジストリにおいて HTTPS リクエストを送信するための接続を指定します。前のステージがドキュメントを $https に渡すたびに、そのステージは新しいリクエストを送信します。

特定の接続に HTTPS リクエストを送信するには、以下の手順に従います。

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>,
"parseJsonStrings": <boolean>
}
}
}

$httpsステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

接続レジストリ内で HTTPS リクエストを送信する接続を識別するラベル。

path

string |式

任意

自身の connectionName が解決する URL に追加するパス。

たとえば、https://sample.com に解決する connectionName を指定した場合、ストリーム プロセッサが https://sample.com/endpointHTTPS リクエストを送信するために、"endpoint"path を指定できます。

path を式として定義する場合、その式は string として評価される必要があります。

呼び出す API エンドポイントは、冪等である必要があります。

parameters

ドキュメント

任意

API エンドポイント呼び出しにパラメータとして渡すキーと値のペアを含むドキュメント。各キーは string である必要があり、各値は数値、string、またはブール値のいずれかに評価される必要があります。このフィールドは、値としてをサポートしています。

method

string

任意

接続のための HTTPS リクエストメソッド。次のいずれかの値でなければなりません。

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

デフォルトは "GET" です。

headers

ドキュメント

任意

API エンドポイントにヘッダーとして渡すためのキーと値のペアを含むドキュメント。各キーは string である必要があり、各値は string として評価される必要があります。このフィールドは、値としてをサポートしています。

APIエンドポイントが認証を必要とする場合(例えば、APIキーやベアラーアクセストークン認証など)、接続を定義する際に認証情報をヘッダーとして追加し、これらがこの演算子の一部としてプレーンテキストで提供されないようにする必要があります。

無効なHTTPヘッダー名と値はAPIエンドポイントに送信されません。代わりに、これらは無視されます。

無効なHTTPヘッダーの詳細については、「 RFC9110 を参照してください。

値の式が失敗するか、string 以外の型に評価された場合、メッセージはデッドレターキュー (DLQ) に送られ、演算子はこのリクエストを API エンドポイントに送信しません。

as

string

必須

REST API 応答のフィールド名。

エンドポイントが 0 バイトを返す場合、演算子は as フィールドを設定しません。

演算子は、Content-Typeapplication/json または text/plain の応答をサポートします。API エンドポイントが異なる Content-Type の応答を返す場合、演算子は定義された onError の動作に基づいてドキュメントを処理します。

APIエンドポイントが Content-Type を定義していない応答を返した場合、演算子は、その応答が application/json であると見なします。

onError

string

任意

HTTPS 関連の障害が発生した場合の演算子の動作。次のいずれかの値でなければなりません。

  • "dlq" : 影響を受けるドキュメントをデッドレターキュー (DLQ)に渡します。

  • "ignore" : エラーを無視し、影響を受けたドキュメントを次のパイプラインステージに渡します。

  • "fail" : エラーが発生した場合、ストリーム プロセッサを終了させます。

演算子は、すべての 2XX HTTP ステータスコードを成功と見なします。演算子が応答で以下の HTTP ステータスコードのいずれかを受信した場合、演算子は、このフィールドで指定した値に基づいて動作を行います。

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

演算子は、他のすべての HTTP ステータス コードを "fail" エラーと見なします。たとえば、API エンドポイントが 500 HTTP ステータスコードを返す場合、プロセッサは失敗状態になり、停止します。

onError は、無効な式などの $https 演算子自体の誤った構成から生じるエラーにはトリガーされません。

デフォルトは "dlq" です。

payload

配列

任意

API エンドポイントに送信するリクエストボディをカスタマイズできるカスタム内部パイプライン。payload は次の式をサポートしています。

  • $project

  • $addFields

  • $replaceRoot

  • $set

デフォルトでは、メッセージ全体が API エンドポイントに送信されます。Atlas Stream Processing は、緩和モードの JSON ペイロードを API エンドポイントに送信します。

無効な HTTP リクエストボディは API エンドポイントに送信されません。代わりに、これらはデッドレターキューに送られます。

無効な HTTP リクエストボディの詳細については、「RFC 9110」を参照してください。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.connectionTimeoutSec

integer

任意

成功した HTTPS 接続が応答を受け取らなかった場合、タイムアウトするまでの時間(秒単位)。

デフォルトは 30 です。

config.parseJsonStrings

ブール値

任意

Atlas がサーバー応答を再帰的に反復処理し、有効な JSON (エスケープ引用符を使用)を含む string 値を有効なBSONに直列化するかどうかを決定する設定で、下流のパイプラインステージで 結果を操作できるようにします。

デフォルトは false です。

config.requestTimeoutSec

integer

任意

接続できない場合に HTTPS リクエストがタイムアウトするまでの時間(秒単位)。

デフォルトは 60 です。

$https$source ステージの後、および $emit ステージまたは $merge ステージの前に配置する必要があります。$https を、$hoppingWindow または $tumblingWindow の内部パイプラインで使用できます。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. $sourceステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime に設定されます。

  2. Apache Kafkaプロバイダーからの各レコードに対して、 $httpsステージはhttps_weather 接続で定義された HTTPSposition.coordinates 気象ソースにリクエストを送信します。リクエストでは、HTTPSリクエストのレコードの を使用して、そのロケーションの 7 日間の高温度予測値を摂氏単位で収集し、airTemperatureForecast フィールドのパイプラインドキュメントに追加します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$validate

項目一覧