定义
$https
阶段会在连接注册表中指定一个连接,以向其发送 HTTPS
请求。每当某一先前的阶段将文档传递给 $https
时,该阶段均会发送一个新请求。
语法
要向给定连接发送 HTTPS
请求,请执行以下操作:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer>, "parseJsonStrings": <boolean> } } }
$https
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | 用于标识在连接注册表中要向其发送 |
| 字符串 |表达式 | Optional | 要追加到您的 例如,如果您指定了一个可解析为 如果您将 您调用的 API 终结点应该是幂等的。 |
| 文档 | Optional | 包含键值对以作为参数传递给 API 终结点调用的文档。每个键必须是字符串,且每个值的计算结果必须为数字值、字符串值或布尔值。该字段支持将表达式作为值。 |
| 字符串 | Optional | 用于您连接的 HTTPS 请求方法。必须是以下值之一:
默认值为 |
| 文档 | Optional | |
| 字符串 | 必需 | REST API 响应的对应字段的名称。 如果此终结点返回 0 个字节,该操作符则不会设置 操作符支持具有 如果此 API 终结点返回的响应没有定义 |
| 字符串 | Optional | 当此操作符遇到与
此操作符会将所有
此操作符会将所有其他 HTTP 状态代码视为
默认值为 |
| 阵列 | Optional | |
| 文档 | Optional | 包含可覆盖各种默认值的字段的文档。 |
| 整型 | Optional | 以秒为单位的时间,而成功的 默认值为 |
| 布尔 | Optional | 设置,用于确定Atlas是否递归遍历服务器响应,并将包含有效 JSON (使用转义引号)的字符串值序列化为有效BSON ,以便您可以在下游管道阶段使用结果。 默认值为 |
| 整型 | Optional | 以秒为单位的时间,而当 默认值为 |
行为
$https
必须在 $source
阶段之后,同时必须在 $emit 或 $merge
阶段之前。您可在 $hoppingWindow
或 $tumblingWindow
内部管道中使用 $https
。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source
阶段与在名为my_weatherdata
的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖它投影的时间戳字段的名称,将其设置为ingestionTime
。对于来自 Apache Kafka 代理的每条记录,
$https
阶段均会向https_weather
连接中定义的 HTTPS 天气源发送请求。该请求使用 HTTPS 请求中记录的position.coordinates
来收集该位置的七天高温预报(以摄氏度为单位),并将其添加到airTemperatureForecast
字段中的管道文档。$merge
阶段将输出写入sample_weatherstream
数据库中名为stream
的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
要查看生成的 sample_weatherstream.stream
集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。