정의
$https
단계는 HTTPS
요청을 보낼 연결 레지스트리의 연결을 지정합니다. 이전 단계에서 $https
에 문서를 전달할 때마다 이 단계는 새 요청을 보냅니다.
구문
지정된 연결에 HTTPS
요청을 보내려면:
{ "$https": { "connectionName": "<registered-connection>", "path" : "<subpath>" | <expression>, "parameters" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "method" : "<GET | POST | PUT | PATCH | DELETE>", "headers" : { "<key1>" : "<val1>", . . . "<keyn>" : "<valn>" }, "as" : "response", "onError" : "<DLQ | Ignore | Fail>", "payload" : [{ <inner-pipeline> }], "config" : { "connectionTimeoutSec" : <integer>, "requestTimeoutSec" : <integer>, "parseJsonStrings": <boolean> } } }
$https
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 |
|
| 문자열 | 표현식 | 옵션 |
예를 들어,
호출하는 API 엔드포인트는 멱등적이어야 합니다. |
| 문서 | 옵션 | API 엔드포인트 호출에 매개변수로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며, 각 값은 숫자, 문자열 또는 불리언 값으로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다. |
| 문자열 | 옵션 | 연결에 대한 HTTPS 요청 방법입니다. 다음 값 중 하나여야 합니다.
기본값은 |
| 문서 | 옵션 | API 엔드포인트에 헤더로 전달할 키-값 쌍을 포함한 문서입니다. 각 키는 문자열이어야 하며 각 값은 문자열로 평가되어야 합니다. 이 필드는 표현식을 값으로 지원합니다. API 엔드포인트에 API 키 또는 Bearer Access Token 인증과 같은 인증이 필요한 경우, 연결을 정의할 때 인증 세부 정보를 헤더로 추가하여 이 연산자의 일부에 일반 텍스트로 제공되지 않도록 해야 합니다. 잘못된 HTTP 헤더 이름과 값은 API 엔드포인트로 전송되지 않습니다. 대신 이러한 항목은 무시됩니다. 유효하지 않은 HTTP headers 에 대해 자세히 학습하려면 RFC 9110를 참조하세요. 값의 표현식이 실패하거나 문자열 이외의 유형으로 평가되면 메시지는 데드 레터 큐로 전송되고 연산자는 이 요청을 API 엔드포인트로 전송하지 않습니다. |
| 문자열 | 필수 사항 | REST API 응답의 필드 이름입니다. 엔드포인트가 0바이트를 반환하면, 연산자는 연산자는 API 엔드포인트가 |
| 문자열 | 옵션 | 연산자에
연산자는 모든
연산자는 다른 HTTP 상태 코드를
기본값은 |
| 배열 | 옵션 | API 엔드포인트로 전송되는 요청 본문을 사용자 지정할 수 있는 맞춤형 내부 파이프라인입니다.
기본적으로 전체 메시지가 API 엔드포인트로 전송됩니다. Atlas Stream Processing은 API 엔드포인트로 완화 모드 JSON 페이로드를 보냅니다. 유효하지 않은 HTTP 요청 본문은 API 엔드포인트로 전송되지 않습니다. 대신, 이러한 메시지는 데드 레터 큐로 전송됩니다. 유효하지 않은 HTTP 요청 본문에 대해 자세히 학습하려면 RFC 9110를 참조하세요. |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| integer | 옵션 | 응답을 받지 못할 경우 성공적인 기본값은 |
| 부울 | 옵션 | Atlas 서버 응답을 재귀적으로 반복할지 여부를 결정하고 다운스트림 파이프라인 단계에서 결과로 작업할 수 있도록 유효한 JSON (이스케이프된 따옴표 사용)이 포함된 문자열 값을 유효한 BSON 으로 직렬화하는 설정입니다. 기본값은 |
| integer | 옵션 | 연결되지 않을 경우 기본값은 |
행동
$https
는 $source
단계 뒤에 오고, $emit 또는 $merge
단계 앞에 와야 합니다. $https
를 $hoppingWindow
또는 $tumblingWindow
내부 파이프라인에서 사용할 수 있습니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
단계는
$source
이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을my_weatherdata
설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여ingestionTime
로 설정합니다.Apache Kafka 브로커의 각 기록 에
$https
https_weather
대해 단계에서는 연결에 정의된 HTTPS 날씨 소스로 요청 보냅니다. 요청 HTTPS 요청 의 기록 에서position.coordinates
를 사용하여 해당 위치 의 7일 최고 기온 예보를 섭씨 단위로 수집하고, 이를 파이프라인 문서 의airTemperatureForecast
필드 에 추가합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$https': { connectionName: 'https_weather', path: 'forecast', parameters: { latitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 0 ] }, longitude: { $arrayElemAt: ['$$ROOT.position.coordinates', 1 ] } }, as: 'airTemperatureForecast' }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, airTemperatureForecast: [22.3, 22.4, 22.5, 22.3, 22.4, 22.5, 23.1], atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 30.27, -97.74], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.