정의
$merge 단계는 메시지를 쓸 연결을 연결 레지스트리에서 명시합니다. 연결은 Atlas 연결이어야 합니다.
$merge
파이프라인 단계의 프로토타입 형식은 다음과 같습니다.
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
구문
Atlas Stream Processing 버전의 $merge는 대부분 Atlas Data Federation 버전과 동일한 필드를 사용합니다. Atlas Stream Processing은 또한 $merge
의 구현에 고유하거나 이를 위해 수정된 다음 필드를 사용합니다. Atlas Data Federation $merge
와 공유되는 필드에 대해 자세히 알아보려면 $merge 구문을 참조하세요.
필드 | 필요성 | 설명 |
---|---|---|
| 필수 사항 | Atlas Stream Processing이 Atlas 연결에만 자세한 내용은 Atlas Data Federation |
| 옵션 | Atlas Data Federation
동적 표현식 값을 사용하는 경우, 해당 값은 다음 문자열 중 하나로 리졸브되어야 합니다.
|
| 옵션 | Atlas Data Federation 동적 표현식 값을 사용하는 경우, 해당 값은 다음 문자열 중 하나로 리졸브되어야 합니다.
|
| 조건부 | 쓰기 작업을 분산할 스레드의 수입니다.
|
행동
제한 사항
$merge
가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $merge
단계는 하나만 사용할 수 있습니다.
샤딩된 컬렉션에 대해 $merge
를 사용할 때에는 on
필드에 특별한 요구 사항이 있습니다. 자세한 내용은 $merge 구문을 참조하세요.
into.coll
또는 into.db
에 동적 표현식 값을 사용하는 경우 parallelism
값을 1
보다 크게 설정할 수 없습니다.
$merge
time series 컬렉션에 쓸 수 없습니다. Time series 컬렉션에 문서를 쓰려면 $emit 단계를 이용하세요.
샤딩된 컬렉션에서 $merge
를 사용하려면 Atlas 관리자 역할 있어야 합니다.
동적 표현식
다음 필드의 값으로 동적 표현식을 사용할 수 있습니다.
into.db
into.coll
이를 통해 스트림 프로세서는 메시지별로 다양한 대상 Atlas collection에 메시지를 쓸 수 있습니다.
예시
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Atlas 데이터베이스와 collection으로 정렬하려면 다음과 같은 $merge
단계를 작성할 수 있습니다.
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
이 $merge
단계는 다음과 같습니다.
VIP.subscription
이라는 Atlas collection에Very Important Industries
메시지를 씁니다.employee.requisition
이라는 Atlas collection에N. E. Buddy
메시지를 씁니다.contractor.billableHours
이라는 Atlas collection에Khan Traktor
메시지를 씁니다.
문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
동적 표현식으로 데이터베이스 또는 컬렉션을 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 데드 레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.
Kafka 주제의 데이터 저장
여러 Apache Kafka 주제의 스트리밍 데이터를 Atlas 클러스터의 컬렉션에 저장하려면 $merge
단계와 $source
단계를 사용하세요. $source
단계는 데이터를 읽을 주제를 지정합니다. $merge
단계는 데이터를 대상 컬렉션에 씁니다.
다음 구문을 사용합니다.
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
예시
기본 예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
단계는
$source
이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을my_weatherdata
설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여ingestionTime
로 설정합니다.$match
단계에서는dewPoint.value
값이5.0
미만인 문서를 제외하고dewPoint.value
값이5.0
보다 큰 문서를 다음 단계로 전달합니다.$merge
단계는sample_weatherstream
데이터베이스의stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.
변경 스트림 이벤트 복제
$merge.whenMatched
및 $merge.whenNotMatched
매개변수를 사용하여 작업 유형에 따라 변경 스트림 이벤트의 효과를 복제할 수 있습니다.
다음 집계에는 4단계가 있습니다.
$source
단계는atlas1
연결을 통해 Atlas 클러스터의db1.coll1
컬렉션에 연결을 설정합니다.$addFields
단계는 수집된 문서에 각 문서의"$operationType
값과"delete"
의 동등성 검사 결과로 설정된fullDocument._isDelete
필드를 추가하여 보강합니다. 이 등식은 불리언으로 평가됩니다.$replaceRoot
단계에서는 문서를 강화된$fullDocument
필드의 값으로 대체합니다.$merge
단계는atlas2
연결을 통해db1.coll1
에 문서를 쓰고 각 문서에 대해 두 가지 검사를 수행합니다.먼저
whenMatched
필드는on
이 명시적으로 설정되지 않았으므로 기본 일치 필드인_id
를 사용하여 문서가db1.coll1
컬렉션의 기존 문서와 일치하는지 확인합니다.fullDocument._isDelete
가true
로 설정된 경우, Atlas는 일치하는 문서를 삭제합니다. 일치하는 경우 및fullDocument._isDelete
가false
로 설정된 경우, Atlas는 일치하는 문서를 스트리밍 데이터 소스의 새 문서로 교체합니다.둘째, Atlas Stream Processing이 일치하는 문서를 찾지 못하고
fullDocument._isDelete
가 true인 경우, Atlas는 문서를 컬렉션에 쓰지 않고 삭제합니다. 일치하는 문서가 없고fullDocument._isDelete
가 false인 경우, Atlas는 스트리밍 데이터 소스의 문서를 컬렉션에 삽입합니다.
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }