Docs 菜单
Docs 主页
/
Atlas
/ /

$https

$https

$https 阶段会在连接注册表中指定一个连接,以向其发送 HTTPS 请求。每当某一先前的阶段将文档传递给 $https 时,该阶段均会发送一个新请求。

要向给定连接发送 HTTPS 请求,请执行以下操作:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>,
"parseJsonStrings": <boolean>
}
}
}

$https 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识在连接注册表中要向其发送 HTTPS 请求的连接的标签。

path

字符串 |表达式

Optional

要追加到您的 connectionName 所解析为的 URL 的路径。

例如,如果您指定了一个可解析为 https://sample.comconnectionName,则可指定 "endpoint"path,以便您的流处理器向 https://sample.com/endpoint 发送 HTTPS 请求。

如果您将 path 定义为表达式,则该表达式的计算结果必须为字符串。

您调用的 API 终结点应该是幂等的。

parameters

文档

Optional

包含键值对以作为参数传递给 API 终结点调用的文档。每个键必须是字符串,且每个值的计算结果必须为数字值、字符串值或布尔值。该字段支持将表达式作为值。

method

字符串

Optional

用于您连接的 HTTPS 请求方法。必须是以下值之一:

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

默认值为 "GET"

headers

文档

Optional

包含作为标头传递给 API 终结点的键值对的文档。每个键必须为字符串,且每个值的计算结果必须为字符串。该字段支持将表达式作为值。

如果 API 终结点需要身份验证,例如 API 密钥或 Bearer 访问令牌身份验证,您应在定义连接时将身份验证详细信息作为标头添加,以防止在此操作符中以明文形式提供这些信息。

无效的 HTTP 标头名称和值不会被发送到此 API 终结点。相反,它们会被忽略。

要了解有关无效 HTTP 标头的更多信息,请参阅 RFC 9110

如果值中的表达式失败或计算结果为字符串以外的类型,则该消息将发送到死信队列(DLQ),并且操作符不会将此请求发送到API端点。

as

字符串

必需

REST API 响应的对应字段的名称。

如果此终结点返回 0 个字节,该操作符则不会设置 as 字段。

操作符支持具有 Content-Typeapplication/jsontext/plain 的响应。如果 API 终结点返回的响应具有不同的 Content-Type,操作符将根据您定义的 onError 行为来处理文档。

如果此 API 终结点返回的响应没有定义 Content-Type,该操作符则假定此响应为 application/json

onError

字符串

Optional

当此操作符遇到与 HTTPS 相关的故障时的行为。必须是以下值之一:

此操作符会将所有 2XX HTTP 状态码视为成功。如果此操作符在响应中收到以下任意 HTTP 状态代码,此操作符则会根据您在该字段中提供的值执行相应的行为:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

此操作符会将所有其他 HTTP 状态代码视为 "fail" 错误。例如,如果此 API 终结点返回 500 HTTP 状态代码,处理器则会变为失败状态并停止。

onError 不会因 $https 操作符本身配置不当(例如,无效表达式)而触发错误。

默认值为 "dlq"

payload

阵列

Optional

自定义内部管道,允许您自定义发送到 API 终结点的请求正文。payload 支持以下表达式:

  • $project

  • $addFields

  • $replaceRoot

  • $set

默认情况下,整个消息均会发送到此 API 终结点。Atlas Stream Processing 会向此 API 终结点发送宽松模式 JSON 有效负载。

无效的HTTP请求正文不会发送到API端点。相反,它们会被发送到死信队列(DLQ)。

要了解有关无效 HTTP 请求正文的更多信息,请参阅 RFC 9110

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.connectionTimeoutSec

整型

Optional

以秒为单位的时间,而成功的 HTTPS 连接在未收到响应时会在超过此时间后超时。

默认值为 30

config.parseJsonStrings

布尔

Optional

设置,用于确定Atlas是否递归遍历服务器响应,并将包含有效 JSON (使用转义引号)的字符串值序列化为有效BSON ,以便您可以在下游管道阶段使用结果。

默认值为 false

config.requestTimeoutSec

整型

Optional

以秒为单位的时间,而当 HTTPS 请求无法连接时会在超过此时间后超时。

默认值为 60

$https 必须在 $source 阶段之后,同时必须在 $emit$merge 阶段之前。您可在 $hoppingWindow$tumblingWindow 内部管道中使用 $https

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与在名为 my_weatherdata 的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖它投影的时间戳字段的名称,将其设置为 ingestionTime

  2. 对于来自 Apache Kafka 代理的每条记录,$https 阶段均会向 https_weather 连接中定义的 HTTPS 天气源发送请求。该请求使用 HTTPS 请求中记录的 position.coordinates 来收集该位置的七天高温预报(以摄氏度为单位),并将其添加到 airTemperatureForecast 字段中的管道文档。

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

$validate

在此页面上