Docs Menu
Docs Home
/ /
/ / /

$setStreamMeta 애그리게이션 단계(스트림 처리)

$setStreamMeta

$setStreamMeta 단계에서는 수신 문서에 대한 메타데이터 필드를 설정합니다. 이 메타데이터 사용하면 문서 자체의 내용을 변경하지 않고도 특정 값을 기반으로 문서에 대한 선택적 작업을 수행할 수 있습니다.

$setStreamMeta 단계의 프로토타입 형식은 다음과 같습니다.

{
$setStreamMeta: {
"<metadata-field>": <expression>
}
}

$setStreamMeta 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

<metadata-field>

표현식

필수 사항

문서에 적용 메타데이터 필드 . 키와 값을 모두 설정하다 해야 합니다.

  • 키는 stream.로 시작하는 문자열이어야 합니다.

  • 값은 표현식 이어야 합니다.

표현식 평가가 실패하면 Atlas Stream Processing 문서 DLQ로 보냅니다.

자세한 학습 은 예제를 참조하세요.

은 단계 이후 $emit $setStreamMeta 또는 $source $merge 단계 앞에 와야 합니다.

스트리밍 데이터 소스 전자상거래 플랫폼을 통해 이루어진 각 주문에 대해 문서 생성합니다. 문서의 형식은 다음과 같습니다.

{
orderId: 1,
productId: "A",
qty: 2
}

Atlas 데이터베이스 다음 형식의 문서가 포함된 products 컬렉션 포함되어 있습니다.

[
{_id: "A", name: "Laptop", category: "tech"},
{_id: "B", name: "Shirt", category: "clothing"}
]

다음 집계 6단계로 구성됩니다.

  1. 단계에서는 전자상거래 플랫폼에서 주문 문서를 수집하여 수집되는 각 기록 후속 집계 단계에 $source 노출합니다.

  2. $lookup 단계에서는 들어오는 주문 문서를 products shop 필드 의 데이터베이스 에 있는 컬렉션 _id 에 결합합니다. 결과 배열 필드 이름은 product 입니다.

  3. $unwind 단계에서는 의 단일 요소 배열 값을 product 문서 로 바꿉니다.

  4. $setStreamMeta 단계에서는 stream.coll product.category라는 메타데이터 필드 값과 동일하게 설정합니다.

  5. $unset 단계에서는 product 필드 제거하고 소스 문서 원래 형식으로 되돌립니다.

  6. $merge 단계는 값에 의해 동적으로 결정되는 컬렉션 으로 문서 stream.coll 병합합니다.

{
$source: {
documents: [
{
orderId: 1,
productId: "A",
qty: 2
},
{
orderId: 2,
productId: "B",
qty: 1
},
{
orderId: 3,
productId: "A",
qty: 5
}
]
}
},
{
$lookup: {
from: {
connectionName: "atlas",
db: "shop",
coll: "products"
},
localField: "productId",
foreignField: "_id",
as: "product"
}
},
{
$unwind: "$product"
},
{
$setStreamMeta: {
"stream.coll": "$product.category"
}
},
{
$unset: [
"product"
]
},
{
$merge: {
into: {
connectionName: "atlas",
db: "shop",
coll: {
$meta: "stream.coll"
}
}
}
}

돌아가기

$sessionWindow

이 페이지의 내용