정의
$emit
단계는 메시지를 내보낼 연결 레지스트리 의 연결을 지정합니다. 연결은 Apache Kafka 브로커 또는 time series 컬렉션 하나여야 합니다.
구문
Apache Kafka 브로커
처리된 데이터를 Apache Kafka 브로커에 쓰기 (write) 려면 다음 프로토타입 형식의 $emit
파이프라인 단계를 사용합니다.
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "<json-format>", "tombstoneWhen": <expression> } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |||||
---|---|---|---|---|---|---|---|---|
| 문자열 | 필수 사항 | 데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다. | |||||
| 문자열 | 표현식 | 필수 사항 | 메시지를 보낼 Apache Kafka 주제 의 이름입니다. | |||||
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. | |||||
| int | 옵션 | ||||||
| 문자열 | 옵션 | 생성자가 생성한 모든 데이터에 대한 압축 유형입니다. 기본값은 없음(즉, 압축하지 않음)입니다. 유효한 값은 다음과 같습니다.
압축은 전체 데이터 배치에 사용되므로 배치의 효율성이 압축 비율에 영향을 미칩니다. 배치 처리가 많을수록 더 나은 압축이 가능합니다. | |||||
| 문자열 | 옵션 | 날짜 값의 날짜 형식입니다. 유효한 값은 다음과 같습니다.
예를 들면 다음과 같습니다. 다음 입력을 고려하세요.
| |||||
| 표현식 | 옵션 | 출력 메시지에 추가할 헤더입니다. 표현식은 객체 또는 배열로 평가되어야 합니다. 표현식이 객체로 평가되는 경우, Atlas Stream Processing은 해당 객체의 각 키-값 쌍에서 헤더를 구성합니다. 여기서 키는 헤더 이름이고 값은 헤더 값입니다. 표현식이 배열로 평가되는 경우 키-값 쌍 객체배열 형식을 취해야 합니다. 예시:
Atlas Stream Processing은 배열의 각 객체로부터 헤더를 구성하며 이때 키는 헤더 이름이고 값은 헤더 값입니다. Atlas Stream Processing은 다음 유형의 헤더 값을 지원합니다.
| |||||
| 객체 | 문자열 | 표현식 | 옵션 | Apache Kafka 메시지 키로 평가되는 표현식입니다.
| |||||
| 문자열 | 조건부 | Apache Kafka 키 데이터를 직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.
기본값은 | |||||
| 문자열 | 옵션 | Apache Kafka 로 메시지를 보낼 때 사용할 JSON 형식입니다.. 다음 값 중 하나여야 합니다.
기본값은 | |||||
| 표현식 | 옵션 |
표현식 부울 값으로 평가되지 않거나 평가할 수 없는 경우 Atlas Stream Processing 문서 DLQ에 씁니다.
|
Atlas Time Series 컬렉션
처리된 데이터를 Atlas time series 컬렉션에 쓰려면 다음 프로토타입 형식의 $emit
파이프라인 단계를 사용합니다.
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
| 문자열 | 필수 사항 | 데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다. |
| 문자열 | 표현식 | 필수 사항 | 대상 time series 컬렉션 이 포함된 Atlas 데이터베이스 의 이름 또는 표현식 입니다. |
| 문자열 | 표현식 | 필수 사항 | 쓰기 (write) Atlas time series 컬렉션 의 이름 또는 표현식 입니다. |
| 문서 | 필수 사항 | 컬렉션의 time series 필드 를 정의하는 문서입니다. |
참고
time series 컬렉션 내 문서의 최대 크기는 4 MB입니다. 자세한 내용은 Time Series 컬렉션 제한 사항을 참조하세요.
AWS S3
처리된 데이터를 AWS S3 버킷 싱크 연결에 기록하려면, $emit
파이프라인 단계를 사용하고 아래의 프로토타입 형식을 따릅니다.
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
$emit
단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |||
---|---|---|---|---|---|---|
| 문자열 | 필수 사항 | 데이터를 쓰기 위한 연결의 이름은 연결 레지스트리에 나타나는 대로입니다. | |||
| 문자열 | 필수 사항 | 데이터를 쓰기할 S3 버킷의 이름입니다. | |||
| 문자열 | 옵션 | 대상 버킷이 위치한 AWS 리전의 이름입니다. 스트림 처리 인스턴스를 AWS 리전에 호스팅하면, 이 매개변수는 기본적으로 해당 리전으로 설정됩니다. 그 외의 경우, 스트림 처리 인스턴스 호스트 리전과 가장 가까운 AWS 리전이 기본값으로 설정됩니다. | |||
| 문자열 | 표현식 | 필수 사항 | S3 버킷에 기록된 객체 키의 접두사입니다. 리터럴 접두사 문자열이거나 문자열로 평가되는 표현식 이어야 합니다. | |||
| 문서 | 옵션 | 기본값을 재정의하는 추가 매개변수가 포함된 문서입니다. | |||
| 문서 | 옵션 | 쓰기 동작을 제어하는 추가 매개변수가 포함된 문서입니다. 이러한 매개변수는 어떤 임계값이 먼저 충족되는지에 따라 쓰기 동작을 트리거합니다. 예를 들어, 수집된 문서가 | |||
| integer | 옵션 | S3에 기록되는 각 파일에 그룹화할 문서 수입니다. | |||
| integer | 옵션 | 파일이 S3에 기록되기 전까지 누적되어야 하는 최소 바이트 수를 지정합니다. 바이트 수는 파이프라인에서 수집된 BSON 문서의 크기에 따라 결정되며, 최종 출력 파일의 크기는 고려되지 않습니다. | |||
| 문서 | 옵션 |
기본값은 1분입니다. | |||
| integer | 조건부 |
기본값은 | |||
| 문자열 | 조건부 | 대량 쓰기 타이머를 측정할 시간의 단위입니다. 이 매개변수는 다음 값을 지원합니다.
기본값은 | |||
| 문자열 | 옵션 | 내보낸 파일의 각 항목 사이의 구분자입니다. 기본값은 | |||
| 문자열 | 옵션 | S3에 기록된 JSON의 출력 형식을 지정합니다. 다음 값 중 하나여야 합니다.
기본값은 " 자세한 내용을 보려면 기본 JSON을 참조하세요. | |||
| 문자열 | 옵션 | 날짜 값의 날짜 형식입니다. 유효한 값은 다음과 같습니다.
예를 들어, 파이프라인에 다음 기록을 추가하는 경우:
만약
| |||
| 문자열 | 옵션 | 사용할 압축 알고리즘의 이름입니다. 다음 값 중 하나여야 합니다.
| |||
| 문자열 | 조건부 | 내보내는 메시지에 적용할 압축 단계입니다. 기본값은 이 매개변수는 |
Basic JSON
메시지 수집을 용이하게 하기 위해 Atlas Stream Processing은 기본 JSON을 지원하며, 이는 RelaxedJSON 형식을 간소화한 것입니다. 다음 표는 해당 필드들에 적용된 이러한 간소화 예시를 보여줍니다.
필드 유형 | relaxedJson | basicJson |
---|---|---|
바이너리 |
|
|
날짜 |
|
|
10진수 |
|
|
타임스탬프 |
|
|
ObjectId |
|
|
음의 무한대 |
|
|
양의 무한대 |
|
|
정규 표현식 |
|
|
UUID |
|
|
행동
$emit
가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $emit
단계는 하나만 사용할 수 있습니다.
스트림 프로세서당 하나의 Atlas time series 컬렉션에만 쓸 수 있습니다. 존재하지 않는 컬렉션을 지정하면 Atlas는 사용자가 지정한 time series 필드로 컬렉션을 생성합니다. 기존 데이터베이스를 지정해야 합니다.
동적 표현식 topic
, db
및 coll
필드의 값으로 사용하여 스트림 프로세서가 메시지별로 다른 대상에 쓰기 (write) 수 활성화 . 표현식 문자열로 평가되어야 합니다.
예시
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Apache Kafka 주제 로 정렬하려면 다음 $emit
단계를 쓰기 (write) 수 있습니다.
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
이 $emit
단계는 다음과 같습니다.
VIP
이라는 주제에Very Important Industries
메시지를 씁니다.employee
이라는 주제에N. E. Buddy
메시지를 씁니다.contractor
이라는 주제에Khan Traktor
메시지를 씁니다.
동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
아직 존재하지 않는 주제 지정하면Apache Kafka 해당 주제를 대상으로 하는 첫 번째 메시지를 받을 때 자동으로 주제 생성합니다.
동적 표현식으로 주제를 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 데드 레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
단계는
$source
이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을my_weatherdata
설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여ingestionTime
로 설정합니다.$match
단계에서는airTemperature.value
값이30.0
이상인 문서를 제외하고airTemperature.value
값이30.0
미만인 문서를 다음 단계로 전달합니다.$addFields
단계는 스트림을 메타데이터로 풍부하게 합니다.$emit
단계에서는weatherStreamOutput
Kafka 브로커 연결을 통해stream
이라는 주제에 출력을 기록합니다.
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
stream
주제의 문서는 다음 형식을 따릅니다.
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" }, "_stream_meta": { "source": { "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.