정의
$merge 단계는 메시지를 쓸 연결을 연결 레지스트리에서 명시합니다. 연결은 Atlas 연결이어야 합니다.
$merge 파이프라인 단계의 프로토타입 형식은 다음과 같습니다.
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
구문
Atlas Stream Processing 버전의 $merge는 대부분 Atlas Data Federation 버전과 동일한 필드를 사용합니다. Atlas Stream Processing은 또한 $merge의 구현에 고유하거나 이를 위해 수정된 다음 필드를 사용합니다. Atlas Data Federation $merge와 공유되는 필드에 대해 자세히 알아보려면 $merge 구문을 참조하세요.
필드 | 필요성 | 설명 |
|---|---|---|
| 필수 사항 | Atlas Stream Processing이 Atlas 연결에만 자세한 내용은 Atlas Data Federation |
| 옵션 | Atlas Data Federation
동적 표현식 값을 사용하는 경우, 해당 값은 다음 문자열 중 하나로 리졸브되어야 합니다.
|
| 옵션 | Atlas Data Federation 동적 표현식 값을 사용하는 경우, 해당 값은 다음 문자열 중 하나로 리졸브되어야 합니다.
|
| 조건부 | 쓰기 작업을 분산할 스레드의 수입니다.
|
행동
제한 사항
$merge 가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $merge 단계는 하나만 사용할 수 있습니다.
샤딩된 컬렉션에 대해 $merge를 사용할 때에는 on 필드에 특별한 요구 사항이 있습니다. 자세한 내용은 $merge 구문을 참조하세요.
into.coll 또는 into.db에 동적 표현식 값을 사용하는 경우 parallelism 값을 1보다 크게 설정할 수 없습니다.
$merge time series 컬렉션에 쓸 수 없습니다. Time series 컬렉션에 문서를 쓰려면 $emit 단계를 이용하세요.
샤딩된 컬렉션에서 $merge 를 사용하려면 Atlas 관리자 역할 있어야 합니다.
동적 표현식
다음 필드의 값으로 동적 표현식을 사용할 수 있습니다.
into.dbinto.coll
이를 통해 스트림 프로세서는 메시지별로 다양한 대상 Atlas collection에 메시지를 쓸 수 있습니다.
예시
다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
이러한 각 항목을 고유한 Atlas 데이터베이스와 collection으로 정렬하려면 다음과 같은 $merge 단계를 작성할 수 있습니다.
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
이 $merge 단계는 다음과 같습니다.
VIP.subscription이라는 Atlas collection에Very Important Industries메시지를 씁니다.employee.requisition이라는 Atlas collection에N. E. Buddy메시지를 씁니다.contractor.billableHours이라는 Atlas collection에Khan Traktor메시지를 씁니다.
문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.
동적 표현식으로 데이터베이스 또는 컬렉션을 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 데드 레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.
Kafka 주제의 데이터 저장
여러 Apache Kafka 주제의 스트리밍 데이터를 Atlas 클러스터의 컬렉션에 저장하려면 $merge 단계와 $source 단계를 사용하세요. $source 단계는 데이터를 읽을 주제를 지정합니다. $merge 단계는 데이터를 대상 컬렉션에 씁니다.
다음 구문을 사용합니다.
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
예시
기본 예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
단계는
$source이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을my_weatherdata설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여ingestionTime로 설정합니다.$match단계에서는dewPoint.value값이5.0미만인 문서를 제외하고dewPoint.value값이5.0보다 큰 문서를 다음 단계로 전달합니다.$merge단계는sample_weatherstream데이터베이스의stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.
변경 스트림 이벤트 복제
$merge.whenMatched 및 $merge.whenNotMatched 매개변수를 사용하여 작업 유형에 따라 변경 스트림 이벤트의 효과를 복제할 수 있습니다.
다음 집계에는 4단계가 있습니다.
$source단계는atlas1연결을 통해 Atlas 클러스터의db1.coll1컬렉션에 연결을 설정합니다.$addFields단계는 수집된 문서에 각 문서의"$operationType값과"delete"의 동등성 검사 결과로 설정된fullDocument._isDelete필드를 추가하여 보강합니다. 이 등식은 불리언으로 평가됩니다.$replaceRoot단계에서는 문서를 강화된$fullDocument필드의 값으로 대체합니다.$merge단계는atlas2연결을 통해db1.coll1에 문서를 쓰고 각 문서에 대해 두 가지 검사를 수행합니다.먼저
whenMatched필드는on이 명시적으로 설정되지 않았으므로 기본 일치 필드인_id를 사용하여 문서가db1.coll1컬렉션의 기존 문서와 일치하는지 확인합니다.fullDocument._isDelete가true로 설정된 경우, Atlas는 일치하는 문서를 삭제합니다. 일치하는 경우 및fullDocument._isDelete가false로 설정된 경우, Atlas는 일치하는 문서를 스트리밍 데이터 소스의 새 문서로 교체합니다.둘째, Atlas Stream Processing이 일치하는 문서를 찾지 못하고
fullDocument._isDelete가 true인 경우, Atlas는 문서를 컬렉션에 쓰지 않고 삭제합니다. 일치하는 문서가 없고fullDocument._isDelete가 false인 경우, Atlas는 스트리밍 데이터 소스의 문서를 컬렉션에 삽입합니다.
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }