Docs Menu
Docs Home
/
Atlas
/ /

$merge (스트림 처리)

$merge 단계는 메시지를 쓸 연결을 연결 레지스트리에서 명시합니다. 연결은 Atlas 연결이어야 합니다.

$merge 파이프라인 단계의 프로토타입 형식은 다음과 같습니다.

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>",
"whenNotMatched": "insert | discard | expression",
"parallelism": <integer>
}
}

Atlas Stream Processing 버전의 $merge는 대부분 Atlas Data Federation 버전과 동일한 필드를 사용합니다. Atlas Stream Processing은 또한 $merge의 구현에 고유하거나 이를 위해 수정된 다음 필드를 사용합니다. Atlas Data Federation $merge와 공유되는 필드에 대해 자세히 알아보려면 $merge 구문을 참조하세요.

필드
필요성
설명

into

필수 사항

Atlas Stream Processing이 Atlas 연결에만 $merge를 지원한다는 점을 고려하여 간소화되었습니다.

자세한 내용은 Atlas Data Federation $merge 필드에 대한 설명을 참조하세요.

whenMatched

옵션

Atlas Data Federation $merge 단계와 비교하여 "delete" 및 동적 표현식을 지원하여 기능을 확장합니다.

"delete"로 설정하면 Atlas는 조건에 맞는 모든 메시지를 대상 Atlas 컬렉션에서 삭제합니다.

동적 표현식 값을 사용하는 경우, 해당 값은 다음 문자열 중 하나로 리졸브되어야 합니다.

  • "merge"

  • "replace"

  • "keepExisting"

  • "delete"

whenNotMatched

옵션

Atlas Data Federation $merge 단계에 비해 동적 표현식을 지원하여 기능을 확장합니다.

동적 표현식 값을 사용하는 경우, 해당 값은 다음 문자열 중 하나로 리졸브되어야 합니다.

  • "insert"

  • "discard"

  • "expression"

parallelism

조건부

쓰기 작업을 분산할 스레드의 수입니다. 1에서 16 사이의 정수 값이어야 합니다. 병렬 처리 값이 높을수록 처리량이 증가합니다. 그러나 더 높은 값은 스트림 프로세서와 이를 기록하는 클러스터가 더 많은 컴퓨팅 리소스를 사용해야 합니다.

into.coll 또는 into.db에 동적 표현식 값을 사용하는 경우 이 값을 1보다 크게 설정할 수 없습니다.

$merge 가 표시되는 파이프라인의 마지막 단계여야 합니다. 파이프라인당 $merge 단계는 하나만 사용할 수 있습니다.

샤딩된 컬렉션에 대해 $merge를 사용할 때에는 on 필드에 특별한 요구 사항이 있습니다. 자세한 내용은 $merge 구문을 참조하세요.

into.coll 또는 into.db에 동적 표현식 값을 사용하는 경우 parallelism 값을 1보다 크게 설정할 수 없습니다.

$merge time series 컬렉션에 쓸 수 없습니다. Time series 컬렉션에 문서를 쓰려면 $emit 단계를 이용하세요.

샤딩된 컬렉션에서 $merge 를 사용하려면 Atlas 관리자 역할 있어야 합니다.

다음 필드의 값으로 동적 표현식을 사용할 수 있습니다.

  • into.db

  • into.coll

이를 통해 스트림 프로세서는 메시지별로 다양한 대상 Atlas collection에 메시지를 쓸 수 있습니다.

예시

다음 형식의 메시지를 생성하는 트랜잭션 이벤트 스트림이 있습니다.

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

이러한 각 항목을 고유한 Atlas 데이터베이스와 collection으로 정렬하려면 다음과 같은 $merge 단계를 작성할 수 있습니다.

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

$merge 단계는 다음과 같습니다.

  • VIP.subscription 이라는 Atlas collection에 Very Important Industries 메시지를 씁니다.

  • employee.requisition 이라는 Atlas collection에 N. E. Buddy 메시지를 씁니다.

  • contractor.billableHours 이라는 Atlas collection에 Khan Traktor 메시지를 씁니다.

문자열로 평가되는 동적 표현식만 사용할 수 있습니다. 동적 표현식에 대한 자세한 내용은 표현식 연산자를 참조하세요.

동적 표현식으로 데이터베이스 또는 컬렉션을 지정했지만 Atlas Stream Processing이 지정된 메시지에 대한 표현식을 평가할 수 없는 경우, Atlas Stream Processing은 구성된 경우 해당 메시지를 데드 레터 큐로 보내고 후속 메시지를 처리합니다. 데드 레터 큐가 구성되어 있지 않은 경우 Atlas Stream Processing은 메시지를 완전히 건너뛰고 후속 메시지를 처리합니다.

여러 Apache Kafka 주제의 스트리밍 데이터를 Atlas 클러스터의 컬렉션에 저장하려면 $merge 단계와 $source 단계를 사용하세요. $source 단계는 데이터를 읽을 주제를 지정합니다. $merge 단계는 데이터를 대상 컬렉션에 씁니다.

다음 구문을 사용합니다.

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. 단계는 $source 이라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을 my_weatherdata 설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 재정의하여 ingestionTime로 설정합니다.

  2. $match 단계에서는 dewPoint.value 값이 5.0 미만인 문서를 제외하고 dewPoint.value 값이 5.0 보다 큰 문서를 다음 단계로 전달합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

$merge.whenMatched$merge.whenNotMatched 매개변수를 사용하여 작업 유형에 따라 변경 스트림 이벤트의 효과를 복제할 수 있습니다.

다음 집계에는 4단계가 있습니다.

  1. $source 단계는 atlas1 연결을 통해 Atlas 클러스터의 db1.coll1 컬렉션에 연결을 설정합니다.

  2. $addFields 단계는 수집된 문서에 각 문서의 "$operationType 값과 "delete"의 동등성 검사 결과로 설정된 fullDocument._isDelete 필드를 추가하여 보강합니다. 이 등식은 불리언으로 평가됩니다.

  3. $replaceRoot 단계에서는 문서를 강화된 $fullDocument 필드의 값으로 대체합니다.

  4. $merge 단계는 atlas2 연결을 통해 db1.coll1에 문서를 쓰고 각 문서에 대해 두 가지 검사를 수행합니다.

    • 먼저 whenMatched 필드는 on이 명시적으로 설정되지 않았으므로 기본 일치 필드인 _id를 사용하여 문서가 db1.coll1 컬렉션의 기존 문서와 일치하는지 확인합니다. fullDocument._isDeletetrue로 설정된 경우, Atlas는 일치하는 문서를 삭제합니다. 일치하는 경우 및 fullDocument._isDeletefalse로 설정된 경우, Atlas는 일치하는 문서를 스트리밍 데이터 소스의 새 문서로 교체합니다.

    • 둘째, Atlas Stream Processing이 일치하는 문서를 찾지 못하고 fullDocument._isDelete가 true인 경우, Atlas는 문서를 컬렉션에 쓰지 않고 삭제합니다. 일치하는 문서가 없고 fullDocument._isDelete가 false인 경우, Atlas는 스트리밍 데이터 소스의 문서를 컬렉션에 삽입합니다.

{
$source: {
connectionName: “atlas1”,
db: “db1”,
coll: “coll1”,
fullDocument: “required”
}
},
{
$addFields: {
“fullDocument._isDelete”: {
$eq: [
“$operationType”,
“delete”
]
}
}
},
{
$replaceRoot: {
newRoot: “$fullDocument”
}
},
{
$merge: {
into: {
connectionName: “atlas2”,
db: “db1”,
coll: “coll1”
},
whenMatched: {
$cond: {
if: “$_isDelete”,
then: “delete”,
else: “replace”
}
},
whenNotMatched: {
$cond: {
if: “$_isDelete”,
then: “discard”,
else: “insert”
}
},
}
}

돌아가기

$emit

이 페이지의 내용