Docs Menu
Docs Home
/
Atlas
/ /

$https 애그리게이션 단계(스트림 처리)

$https

$https 단계는 HTTPS 요청을 보낼 연결 레지스트리의 연결을 지정합니다. 이전 단계에서 $https에 문서를 전달할 때마다 이 단계는 새 요청을 보냅니다.

지정된 연결에 HTTPS 요청을 보내려면:

{
"$https": {
"connectionName": "<registered-connection>",
"path" : "<subpath>" | <expression>,
"parameters" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"method" : "<GET | POST | PUT | PATCH | DELETE>",
"headers" : {
"<key1>" : "<val1>",
. . .
"<keyn>" : "<valn>"
},
"as" : "response",
"onError" : "<DLQ | Ignore | Fail>",
"payload" : [{
<inner-pipeline>
}],
"config" : {
"connectionTimeoutSec" : <integer>,
"requestTimeoutSec" : <integer>,
"parseJsonStrings": <boolean>
}
}
}

$https 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

HTTPS 요청을 보낼 연결 레지스트리의 연결을 식별하는 레이블입니다.

path

문자열 | 표현식

옵션

connectionName이 확인하는 URL에 추가할 경로입니다.

예를 들어, https://sample.com로 확인되는 connectionName을 지정하면, 스트림 프로세서가 https://sample.com/endpointHTTPS 요청을 보내도록 "endpoint"path를 지정할 수 있습니다.

path를 표현식으로 정의하면, 이 표현식은 문자열로 평가되어야 합니다.

호출하는 API 엔드포인트는 멱등적이어야 합니다.

parameters

문서

옵션

API 엔드포인트 호출에 매개변수로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며, 각 값은 숫자, 문자열 또는 불리언 값으로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다.

method

문자열

옵션

연결에 대한 HTTPS 요청 방법입니다. 다음 값 중 하나여야 합니다.

  • "GET"

  • "POST"

  • "PUT"

  • "PATCH"

  • "DELETE"

기본값은 "GET"입니다.

headers

문서

옵션

API 엔드포인트에 헤더로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며 각 값은 문자열로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다.

API 엔드포인트에 API 키 또는 Bearer Access Token 인증과 같은 인증이 필요한 경우, 연결을 정의할 때 인증 세부 정보를 헤더로 추가하여 이 연산자의 일부에 일반 텍스트로 제공되지 않도록 해야 합니다.

잘못된 HTTP 헤더 이름과 값은 API 엔드포인트로 전송되지 않습니다. 대신 이러한 항목은 무시됩니다.

유효하지 않은 HTTP headers 에 대해 자세히 학습하려면 RFC 9110를 참조하세요.

값의 표현식이 실패하거나 문자열 이외의 유형으로 평가되면 메시지는 데드 레터 큐로 전송되고 연산자는 이 요청을 API 엔드포인트로 전송하지 않습니다.

as

문자열

필수 사항

REST API 응답의 필드 이름입니다.

엔드포인트가 0바이트를 반환하면, 연산자는 as 필드를 설정하지 않습니다.

연산자는 Content-Typeapplication/json 또는 text/plain인 응답을 지원합니다. API 엔드포인트가 다른 Content-Type을 가진 응답을 반환하면, 연산자는 사용자가 정의한 onError 동작에 따라 문서를 처리합니다.

API 엔드포인트가 Content-Type이 정의되지 않은 상태로 응답을 반환하면, 연산자는 그 응답이 application/json이라고 가정합니다.

onError

문자열

옵션

연산자에 HTTPS 관련 오류가 발생할 때의 동작입니다. 다음 값 중 하나여야 합니다.

  • "dlq" : 영향을 받은 문서를 데드 레터 큐로 전달합니다.

  • "ignore" : 오류를 무시하고, 영향을 받은 문서를 다음 파이프라인 단계로 전달합니다.

  • "fail" : 오류가 발생하면 스트림 프로세서를 종료합니다.

연산자는 모든 2XX HTTP 상태 코드를 성공으로 간주합니다. 연산자가 다음 HTTP 상태 코드 중 하나를 응답으로 받으면, 연산자는 이 필드에 제공한 값에 따라 행동을 결정합니다:

  • 400

  • 404

  • 410

  • 413

  • 414

  • 431

연산자는 다른 HTTP 상태 코드를 "fail" 오류로 간주합니다. 예를 들어, API 엔드포인트가 500 HTTP 상태 코드를 반환하면 프로세서는 실패 상태에 들어가고 중지됩니다.

onError$https 연산자 자체의 잘못된 구성으로 인해 발생하는 오류(예: 잘못된 표현식)에 대해서는 트리거되지 않습니다.

기본값은 "dlq"입니다.

payload

배열

옵션

API 엔드포인트로 전송되는 요청 본문을 사용자 지정할 수 있는 맞춤형 내부 파이프라인입니다. payload는 다음 표현식을 지원합니다.

  • $project

  • $addFields

  • $replaceRoot

  • $set

기본적으로 전체 메시지가 API 엔드포인트로 전송됩니다. Atlas Stream Processing은 API 엔드포인트로 완화 모드 JSON 페이로드를 보냅니다.

유효하지 않은 HTTP 요청 본문은 API 엔드포인트로 전송되지 않습니다. 대신, 이러한 메시지는 데드 레터 큐로 전송됩니다.

유효하지 않은 HTTP 요청 본문에 대해 자세히 학습하려면 RFC 9110를 참조하세요.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.connectionTimeoutSec

integer

옵션

응답을 받지 못할 경우 성공적인 HTTPS 연결의 제한 시간이 초과되기까지의 시간(초 단위)입니다.

기본값은 30입니다.

config.parseJsonStrings

부울

옵션

Atlas 서버 응답을 재귀적으로 반복할지 여부를 결정하고 다운스트림 파이프라인 단계에서 결과로 작업할 수 있도록 유효한 JSON (이스케이프된 따옴표 사용)이 포함된 문자열 값을 유효한 BSON 으로 직렬화하는 설정입니다.

기본값은 false입니다.

config.requestTimeoutSec

integer

옵션

연결되지 않을 경우 HTTPS 요청의 제한 시간이 초과되기까지의 시간(초 단위)입니다.

기본값은 60입니다.

$https$source 단계 뒤에 오고, $emit 또는 $merge 단계 앞에 와야 합니다. $https$hoppingWindow 또는 $tumblingWindow 내부 파이프라인에서 사용할 수 있습니다.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. 단계는 $source 이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을 my_weatherdata 설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여 ingestionTime로 설정합니다.

  2. Apache Kafka 브로커의 각 기록 에 $https https_weather 대해 단계에서는 연결에 정의된 HTTPS 날씨 소스로 요청 보냅니다. 요청 HTTPS 요청 의 기록 에서 position.coordinates 를 사용하여 해당 위치 의 7일 최고 기온 예보를 섭씨 단위로 수집하고, 이를 파이프라인 문서 의 airTemperatureForecast 필드 에 추가합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$https': {
connectionName: 'https_weather',
path: 'forecast',
parameters: {
latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] },
longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] }
},
as: 'airTemperatureForecast'
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1],
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 30.27, -97.74], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

$validate

이 페이지의 내용