Docs Menu
Docs Home
/
Atlas
/ /

$emit 애그리게이션 단계(스트림 처리)

$emit 단계는 메시지를 내보낼 연결 레지스트리 의 연결을 지정합니다. 연결은 Apache Kafka 브로커 또는 time series 컬렉션 하나여야 합니다.

처리된 데이터를 Apache Kafka 브로커에 쓰기 (write) 려면 다음 프로토타입 형식의 $emit 파이프라인 단계를 사용합니다.

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<serialization-type>",
"outputFormat": "<json-format>",
"tombstoneWhen": <expression>
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다.

topic

문자열 | 표현식

필수 사항

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.acks

int

옵션

성공적인 $emit 작업을 위해 Apache Kafka 클러스터 에서 필요한 승인의 수입니다.

기본값은 all입니다. Atlas Stream Processing은 다음 값을 지원합니다.

  • -1

  • 0

  • 1

  • all

config.compression_type

문자열

옵션

생성자가 생성한 모든 데이터에 대한 압축 유형입니다. 기본값은 없음(즉, 압축하지 않음)입니다. 유효한 값은 다음과 같습니다.

  • none

  • gzip

  • snappy

  • lz4

  • zstd

압축은 전체 데이터 배치에 사용되므로 배치의 효율성이 압축 비율에 영향을 미칩니다. 배치 처리가 많을수록 더 나은 압축이 가능합니다.

config.dateFormat

문자열

옵션

날짜 값의 날짜 형식입니다. 유효한 값은 다음과 같습니다.

  • default - outputFormat의 기본값을 사용합니다.

  • ISO8601 - 날짜를 ISO8601 형식의 문자열로 변환하며, 여기에는 밀리초 정밀도(YYYY-MM-DDTHH:mm:ss.sssZ)가 포함됩니다.

예를 들면 다음과 같습니다.

다음 입력을 고려하세요.

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

$emit.config.dateFormatdefault로 설정된 경우 출력은 다음과 유사하게 표시됩니다.

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

$emit.config.dateFormatISO8601로 설정된 경우 출력은 다음과 유사하게 표시됩니다.

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

표현식

옵션

출력 메시지에 추가할 헤더입니다. 표현식은 객체 또는 배열로 평가되어야 합니다.

표현식이 객체로 평가되는 경우, Atlas Stream Processing은 해당 객체의 각 키-값 쌍에서 헤더를 구성합니다. 여기서 키는 헤더 이름이고 값은 헤더 값입니다.

표현식이 배열로 평가되는 경우 키-값 쌍 객체배열 형식을 취해야 합니다. 예시:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing은 배열의 각 객체로부터 헤더를 구성하며 이때 키는 헤더 이름이고 값은 헤더 값입니다. Atlas Stream Processing은 다음 유형의 헤더 값을 지원합니다.

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

객체 | 문자열 | 표현식

옵션

Apache Kafka 메시지 키로 평가되는 표현식입니다.

config.key를 지정하는 경우 config.keyFormat을 지정해야 합니다.

config.keyFormat

문자열

조건부

Apache Kafka 키 데이터를 직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

기본값은 binData입니다. config.key를 지정하는 경우 config.keyFormat을 지정해야 합니다. 문서의 config.key가 지정된 데이터 타입으로 성공적으로 직렬화되지 않으면 Atlas Stream Processing은 이를 데드 레터 큐로 보냅니다.

config.outputFormat

문자열

옵션

Apache Kafka 로 메시지를 보낼 때 사용할 JSON 형식입니다.. 다음 값 중 하나여야 합니다.

  • "relaxedJson"

  • "canonicalJson"

기본값은 "relaxedJson"입니다.

config.tombstoneWhen

표현식

옵션

null 을 Kafka 로 내보낼 시기를 결정하는 표현식입니다. 표현식 부울 true 또는 false로 평가되어야 합니다. 특정 문서 에 대해 표현식 true 로 평가되면 Atlas Stream Processing 대신 Kafka 싱크로 null 를 방출합니다. 표현식 $emit false로 평가되면 Atlas Stream Processing 문서 내보냅니다.

표현식 부울 값으로 평가되지 않거나 평가할 수 없는 경우 Atlas Stream Processing 문서 DLQ에 씁니다.

$emit.config.key$emit.config.keyFormat 값을 제공하는 경우 이 설정을 사용하여 주제 압축 활성화 수 있습니다. 이러한 값을 제공하지 않으면 이 표현식 true로 평가될 때 Atlas Stream Processing 여전히 null 를 방출하지만, 이로 인해 Kafka 주제 압축 트리거하다 되지는 않습니다.

처리된 데이터를 Atlas time series 컬렉션에 쓰려면 다음 프로토타입 형식의 $emit 파이프라인 단계를 사용합니다.

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>" | <expression>,
"coll": "<target-coll>" | <expression>,
"timeseries": {
<options>
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

데이터를 수집할 연결의 이름은 연결 레지스트리에 나타나는 대로입니다.

db

문자열 | 표현식

필수 사항

대상 time series 컬렉션 이 포함된 Atlas 데이터베이스 의 이름 또는 표현식 입니다.

coll

문자열 | 표현식

필수 사항

쓰기 (write) Atlas time series 컬렉션 의 이름 또는 표현식 입니다.

timeseries

문서

필수 사항

컬렉션의 time series 필드 를 정의하는 문서입니다.

참고

time series 컬렉션 내 문서의 최대 크기는 4 MB입니다. 자세한 내용은 Time Series 컬렉션 제한 사항을 참조하세요.

처리된 데이터를 AWS S3 버킷 싱크 연결에 기록하려면, $emit 파이프라인 단계를 사용하고 아래의 프로토타입 형식을 따릅니다.

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

$emit 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

데이터를 쓰기 위한 연결의 이름은 연결 레지스트리에 나타나는 대로입니다.

bucket

문자열

필수 사항

데이터를 쓰기할 S3 버킷의 이름입니다.

region

문자열

옵션

대상 버킷이 위치한 AWS 리전의 이름입니다. 스트림 처리 인스턴스를 AWS 리전에 호스팅하면, 이 매개변수는 기본적으로 해당 리전으로 설정됩니다. 그 외의 경우, 스트림 처리 인스턴스 호스트 리전과 가장 가까운 AWS 리전이 기본값으로 설정됩니다.

path

문자열 | 표현식

필수 사항

S3 버킷에 기록된 객체 키의 접두사입니다. 리터럴 접두사 문자열이거나 문자열로 평가되는 표현식 이어야 합니다.

config

문서

옵션

기본값을 재정의하는 추가 매개변수가 포함된 문서입니다.

config.writeOptions

문서

옵션

쓰기 동작을 제어하는 추가 매개변수가 포함된 문서입니다. 이러한 매개변수는 어떤 임계값이 먼저 충족되는지에 따라 쓰기 동작을 트리거합니다.

예를 들어, 수집된 문서가 config.writeOptions.count 임계값에 도달했지만 config.writeOptions.interval 임계값에는 도달하지 않은 경우 스트림 프로세서는 config.writeOptions.count 임계값에 따라 이러한 문서를 S3로 계속 내보냅니다.

config.writeOptions.count

integer

옵션

S3에 기록되는 각 파일에 그룹화할 문서 수입니다.

config.writeOptions.bytes

integer

옵션

파일이 S3에 기록되기 전까지 누적되어야 하는 최소 바이트 수를 지정합니다. 바이트 수는 파이프라인에서 수집된 BSON 문서의 크기에 따라 결정되며, 최종 출력 파일의 크기는 고려되지 않습니다.

config.writeOptions.interval

문서

옵션

sizeunits의 조합으로 문서 대량 쓰기를 위한 타이머를 지정합니다.

기본값은 1분입니다. unit에 대해 size를 0으로 설정할 수 없습니다. 최대 간격은 7일입니다.

config.writeOptions.interval.size

integer

조건부

writeOptions.interval.units로 지정된 단위 수가 경과하면, 스트림 프로세서가 S3에 문서를 대량으로 기록합니다.

기본값은 1입니다. size를 0으로 설정할 수 없습니다. writeOptions.interval를 정의하면 이 매개변수도 반드시 정의해야 합니다.

config.writeOptions.interval.units

문자열

조건부

대량 쓰기 타이머를 측정할 시간의 단위입니다. 이 매개변수는 다음 값을 지원합니다.

  • ms

  • second

  • minute

  • hour

  • day

기본값은 minute입니다. writeOptions.interval을 정의하면 이 매개변수도 반드시 정의해야 합니다.

config.delimiter

문자열

옵션

내보낸 파일의 각 항목 사이의 구분자입니다.

기본값은 \n입니다.

config.outputFormat

문자열

옵션

S3에 기록된 JSON의 출력 형식을 지정합니다. 다음 값 중 하나여야 합니다.

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

기본값은 "relaxedJson"입니다.

자세한 내용을 보려면 기본 JSON을 참조하세요.

config.dateFormat

문자열

옵션

날짜 값의 날짜 형식입니다. 유효한 값은 다음과 같습니다.

  • default - outputFormat의 기본값을 사용합니다.

  • ISO8601 - 날짜를 ISO8601 형식의 문자열로 변환하며, 여기에는 밀리초 정밀도(YYYY-MM-DDTHH:mm:ss.sssZ)가 포함됩니다.

예를 들어, 파이프라인에 다음 기록을 추가하는 경우:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

만약 $emit.config.dateFormatdefault로 설정된 경우 출력은 다음과 유사하게 나타납니다.

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

$emit.config.dateFormatISO8601로 설정된 경우 출력은 다음과 유사하게 표시됩니다.

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

문자열

옵션

사용할 압축 알고리즘의 이름입니다. 다음 값 중 하나여야 합니다.

  • "gzip"

  • "snappy"

config.compressionLevel

문자열

조건부

내보내는 메시지에 적용할 압축 단계입니다. 1-9를 포함하는 값을 지원하며, 값이 클수록 압축률이 더 높아집니다.

기본값은 6입니다.

이 매개변수는 gzip에만 필요하며 해당 방식에만 적용됩니다. config.compressionsnappy로 설정하면 이 매개변수를 설정해도 효과가 없습니다.

메시지 수집을 용이하게 하기 위해 Atlas Stream Processing은 기본 JSON을 지원하며, 이는 RelaxedJSON 형식을 간소화한 것입니다. 다음 표는 해당 필드들에 적용된 이러한 간소화 예시를 보여줍니다.

필드 유형
relaxedJson
basicJson

바이너리

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

날짜

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

10진수

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

타임스탬프

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

음의 무한대

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

양의 무한대

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

정규 표현식

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

$emit 가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $emit 단계는 하나만 사용할 수 있습니다.

스트림 프로세서당 하나의 Atlas time series 컬렉션에만 쓸 수 있습니다. 존재하지 않는 컬렉션을 지정하면 Atlas는 사용자가 지정한 time series 필드로 컬렉션을 생성합니다. 기존 데이터베이스를 지정해야 합니다.

동적 표현식 topic, dbcoll 필드의 값으로 사용하여 스트림 프로세서가 메시지별로 다른 대상에 쓰기 (write) 수 활성화 . 표현식 문자열로 평가되어야 합니다.

예시

다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

이러한 각 항목을 고유한 Apache Kafka 주제 로 정렬하려면 다음 $emit 단계를 쓰기 (write) 수 있습니다.

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

$emit 단계는 다음과 같습니다.

  • VIP 이라는 주제에 Very Important Industries 메시지를 씁니다.

  • employee 이라는 주제에 N. E. Buddy 메시지를 씁니다.

  • contractor 이라는 주제에 Khan Traktor 메시지를 씁니다.

동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.

아직 존재하지 않는 주제 지정하면Apache Kafka 해당 주제를 대상으로 하는 첫 번째 메시지를 받을 때 자동으로 주제 생성합니다.

동적 표현식으로 주제를 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 데드 레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. 단계는 $source 이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을 my_weatherdata 설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여 ingestionTime로 설정합니다.

  2. $match 단계에서는 airTemperature.value 값이 30.0 이상인 문서를 제외하고 airTemperature.value 값이 30.0 미만인 문서를 다음 단계로 전달합니다.

  3. $addFields 단계는 스트림을 메타데이터로 풍부하게 합니다.

  4. $emit 단계에서는 weatherStreamOutput Kafka 브로커 연결을 통해 stream이라는 주제에 출력을 기록합니다.

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

stream 주제의 문서는 다음 형식을 따릅니다.

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
},
"_stream_meta": {
"source": {
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

돌아가기

$tumblingWindow

이 페이지의 내용